3 * Copyright (C) <2015> Centricular Ltd
4 * @author: Edward Hervey <edward@centricular.com>
5 * @author: Jan Schmidt <jan@centricular.com>
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
31 #include <gst/pbutils/pbutils.h>
33 #include "gstplaybackelements.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
38 * SECTION:element-decodebin3
41 * #GstBin that auto-magically constructs a decoding pipeline using available
42 * decoders and demuxers via auto-plugging. The output is raw audio, video
43 * or subtitle streams.
45 * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
47 * * supports publication and selection of stream information via
48 * GstStreamCollection messages and #GST_EVENT_SELECT_STREAMS events.
50 * * dynamically switches stream connections internally, and
51 * reuses decoder elements when stream selections change, so that in
52 * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53 * and only creates new elements when streams change and an existing decoder
54 * is not capable of handling the new format.
56 * * supports multiple input pads for the parallel decoding of auxiliary streams
57 * not muxed with the primary stream.
59 * * does not handle network stream buffering. decodebin3 expects that network stream
60 * buffering is handled upstream, before data is passed to it.
62 * > decodebin3 is still experimental API and a technology preview.
63 * > Its behaviour and exposed API is subject to change.
70 * 1) From sink pad to elementary streams (GstParseBin or identity)
72 * Note : If the incoming streams are push-based-only and are compatible with
73 * either the output caps or a potential decoder, the usage of parsebin is
74 * replaced by a simple passthrough identity element.
76 * The input sink pads are fed to GstParseBin. GstParseBin will feed them
77 * through typefind. When the caps are detected (or changed) we recursively
78 * figure out which demuxer, parser or depayloader is needed until we get to
81 * All elementary streams (whether decoded or not, whether exposed or not) are
82 * fed through multiqueue. There is only *one* multiqueue in decodebin3.
84 * => MultiQueue is the cornerstone.
85 * => No buffering before multiqueue
87 * 2) Elementary streams
89 * After GstParseBin, there are 3 main components:
90 * 1) Input Streams (provided by GstParseBin)
94 * Input Streams correspond to the stream coming from GstParseBin and that gets
95 * fed into a multiqueue slot.
97 * Output Streams correspond to the combination of a (optional) decoder and an
98 * output ghostpad. Output Streams can be moved from one multiqueue slot to
99 * another, can reconfigure itself (different decoders), and can be
100 * added/removed depending on the configuration (all streams outputted, only one
101 * of each type, ...).
103 * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
104 * each 'active' Input Stream there is a corresponding slot.
105 * Slots might have different streams on input and output (due to internal
108 * Due to internal queuing/buffering/..., all those components (might) behave
109 * asynchronously. Therefore probes will be used on each component source pad to
110 * detect various key-points:
112 * the stream is done => Mark that component as done, optionally freeing/removing it
114 * a new stream is starting => link it further if needed
116 * 3) Gradual replacement
118 * If the caps change at any point in decodebin (input sink pad, demuxer output,
119 * multiqueue output, ..), we gradually replace (if needed) the following elements.
121 * This is handled by the probes in various locations:
123 * b) multiqueue input (source pad of Input Streams)
124 * c) multiqueue output (source pad of Multiqueue Slots)
125 * d) final output (target of source ghostpads)
127 * When CAPS event arrive at those points, one of three things can happen:
128 * a) There is no elements downstream yet, just create/link-to following elements
129 * b) There are downstream elements, do a ACCEPT_CAPS query
130 * b.1) The new CAPS are accepted, keep current configuration
131 * b.2) The new CAPS are not accepted, remove following elements then do a)
136 * Input(s) Slots Streams
137 * /-------------------------------------------\ /-----\ /------------- \
139 * +-------------------------------------------------------------------------+
141 * | +---------------------------------------------+ |
142 * | | GstParseBin(s) | |
143 * | | +--------------+ | +-----+ |
144 * | | | |---[parser]-[|--| Mul |---[ decoder ]-[|
145 * |]--[ typefind ]---| demuxer(s) |------------[| | ti | |
146 * | | | (if needed) |---[parser]-[|--| qu | |
147 * | | | |---[parser]-[|--| eu |---[ decoder ]-[|
148 * | | +--------------+ | +------ ^ |
149 * | +---------------------------------------------+ ^ | |
151 * +-----------------------------------------------+--------+-------------+--+
154 * Probes --/--------/-------------/
158 * We want to ensure we re-use decoders when switching streams. This takes place
159 * at the multiqueue output level.
162 * 1) Activating a stream (i.e. linking a slot to an output) is only done within
163 * the streaming thread in the multiqueue_src_probe() and only if the
164 stream is in the REQUESTED selection.
165 * 2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
166 * within the stream thread, but only in a purposefully called IDLE probe
167 * that calls reassign_slot().
169 * Based on those two principles, 3 "selection" of streams (stream-id) are used:
170 * 1) requested_selection
171 * All streams within that list should be activated
172 * 2) active_selection
173 * List of streams that are exposed by decodebin
175 * List of streams that will be moved to requested_selection in the
176 * reassign_slot() method (i.e. once a stream was deactivated, and the output
181 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
182 #define GST_CAT_DEFAULT decodebin3_debug
184 #define GST_TYPE_DECODEBIN3 (gst_decodebin3_get_type ())
186 #define EXTRA_DEBUG 1
188 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
189 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
191 _custom_final_eos_quark_get (void)
193 static gsize g_quark;
195 if (g_once_init_enter (&g_quark)) {
197 (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
198 g_once_init_leave (&g_quark, quark);
203 typedef struct _GstDecodebin3 GstDecodebin3;
204 typedef struct _GstDecodebin3Class GstDecodebin3Class;
206 typedef struct _DecodebinInputStream DecodebinInputStream;
207 typedef struct _DecodebinInput DecodebinInput;
208 typedef struct _DecodebinOutputStream DecodebinOutputStream;
210 struct _GstDecodebin3
214 /* input_lock protects the following variables */
216 /* Main input (static sink pad) */
217 DecodebinInput *main_input;
218 /* Supplementary input (request sink pads) */
220 /* counter for input */
221 guint32 input_counter;
222 /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
223 /* FIXME : Needs to be reset appropriately (when upstream changes ?) */
224 guint32 current_group_id;
225 /* End of variables protected by input_lock */
227 GstElement *multiqueue;
228 GstClockTime default_mq_min_interleave;
229 GstClockTime current_mq_min_interleave;
231 /* selection_lock protects access to following variables */
232 GMutex selection_lock;
233 GList *input_streams; /* List of DecodebinInputStream for active collection */
234 GList *output_streams; /* List of DecodebinOutputStream used for output */
235 GList *slots; /* List of MultiQueueSlot */
238 /* Active collection */
239 GstStreamCollection *collection;
240 /* requested selection of stream-id to activate post-multiqueue */
241 GList *requested_selection;
242 /* list of stream-id currently activated in output */
243 GList *active_selection;
244 /* List of stream-id that need to be activated (after a stream switch for ex) */
246 /* Pending select streams event */
247 guint32 select_streams_seqnum;
248 /* pending list of streams to select (from downstream) */
249 GList *pending_select_streams;
250 /* TRUE if requested_selection was updated, will become FALSE once
251 * it has fully transitioned to active */
252 gboolean selection_updated;
253 /* End of variables protected by selection_lock */
254 gboolean upstream_selected;
256 /* List of pending collections.
257 * FIXME : Is this really needed ? */
258 GList *pending_collection;
261 GMutex factories_lock;
262 guint32 factories_cookie;
263 /* All DECODABLE factories */
265 /* Only DECODER factories */
266 GList *decoder_factories;
267 /* DECODABLE but not DECODER factories */
268 GList *decodable_factories;
270 /* counters for pads */
271 guint32 apadcount, vpadcount, tpadcount, opadcount;
277 struct _GstDecodebin3Class
281 gint (*select_stream) (GstDecodebin3 * dbin,
282 GstStreamCollection * collection, GstStream * stream);
285 /* Input of decodebin, controls input pad and parsebin */
286 struct _DecodebinInput
293 GstPad *parsebin_sink;
295 GstStreamCollection *collection; /* Active collection */
296 gboolean upstream_selected;
300 /* Either parsebin or identity is used */
301 GstElement *parsebin;
302 GstElement *identity;
304 gulong pad_added_sigid;
305 gulong pad_removed_sigid;
306 gulong drained_sigid;
308 /* TRUE if the input got drained
309 * FIXME : When do we reset it if re-used ?
314 /* Multiqueue Slots */
315 typedef struct _MultiQueueSlot
320 /* Type of stream handled by this slot */
323 /* Linked input and output */
324 DecodebinInputStream *input;
326 /* pending => last stream received on sink pad */
327 GstStream *pending_stream;
328 /* active => last stream outputted on source pad */
329 GstStream *active_stream;
331 GstPad *sink_pad, *src_pad;
333 /* id of the MQ src_pad event probe */
338 DecodebinOutputStream *output;
341 /* Streams that are exposed downstream (i.e. output) */
342 struct _DecodebinOutputStream
345 /* The type of stream handled by this output stream */
348 /* The slot to which this output stream is currently connected to */
349 MultiQueueSlot *slot;
351 GstElement *decoder; /* Optional */
352 GstPad *decoder_sink, *decoder_src;
357 /* Flag if ghost pad is exposed */
358 gboolean src_exposed;
360 /* Reported decoder latency */
361 GstClockTime decoder_latency;
363 /* keyframe dropping probe */
364 gulong drop_probe_id;
377 SIGNAL_SELECT_STREAM,
378 SIGNAL_ABOUT_TO_FINISH,
381 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
383 #define SELECTION_LOCK(dbin) G_STMT_START { \
384 GST_LOG_OBJECT (dbin, \
385 "selection locking from thread %p", \
387 g_mutex_lock (&dbin->selection_lock); \
388 GST_LOG_OBJECT (dbin, \
389 "selection locked from thread %p", \
393 #define SELECTION_UNLOCK(dbin) G_STMT_START { \
394 GST_LOG_OBJECT (dbin, \
395 "selection unlocking from thread %p", \
397 g_mutex_unlock (&dbin->selection_lock); \
400 #define INPUT_LOCK(dbin) G_STMT_START { \
401 GST_LOG_OBJECT (dbin, \
402 "input locking from thread %p", \
404 g_mutex_lock (&dbin->input_lock); \
405 GST_LOG_OBJECT (dbin, \
406 "input locked from thread %p", \
410 #define INPUT_UNLOCK(dbin) G_STMT_START { \
411 GST_LOG_OBJECT (dbin, \
412 "input unlocking from thread %p", \
414 g_mutex_unlock (&dbin->input_lock); \
417 GType gst_decodebin3_get_type (void);
418 #define gst_decodebin3_parent_class parent_class
419 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
421 GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");\
422 playback_element_init (plugin);
423 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (decodebin3, "decodebin3", GST_RANK_NONE,
424 GST_TYPE_DECODEBIN3, _do_init);
426 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
428 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
431 GST_STATIC_CAPS_ANY);
433 static GstStaticPadTemplate request_sink_template =
434 GST_STATIC_PAD_TEMPLATE ("sink_%u",
437 GST_STATIC_CAPS_ANY);
439 static GstStaticPadTemplate video_src_template =
440 GST_STATIC_PAD_TEMPLATE ("video_%u",
443 GST_STATIC_CAPS_ANY);
445 static GstStaticPadTemplate audio_src_template =
446 GST_STATIC_PAD_TEMPLATE ("audio_%u",
449 GST_STATIC_CAPS_ANY);
451 static GstStaticPadTemplate text_src_template =
452 GST_STATIC_PAD_TEMPLATE ("text_%u",
455 GST_STATIC_CAPS_ANY);
457 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
460 GST_STATIC_CAPS_ANY);
463 static void gst_decodebin3_dispose (GObject * object);
464 static void gst_decodebin3_finalize (GObject * object);
465 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
466 const GValue * value, GParamSpec * pspec);
467 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
468 GValue * value, GParamSpec * pspec);
470 static gboolean parsebin_autoplug_continue_cb (GstElement *
471 parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
474 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
475 GstStreamCollection * collection, GstStream * stream)
477 GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
482 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
483 GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
484 static void gst_decodebin3_release_pad (GstElement * element, GstPad * pad);
485 static void handle_stream_collection (GstDecodebin3 * dbin,
486 GstStreamCollection * collection, DecodebinInput * input);
487 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
488 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
489 GstStateChange transition);
490 static gboolean gst_decodebin3_send_event (GstElement * element,
493 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
495 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
496 GstElementFactoryListType ftype);
499 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
500 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
501 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
503 static void reconfigure_output_stream (DecodebinOutputStream * output,
504 MultiQueueSlot * slot);
505 static void free_output_stream (GstDecodebin3 * dbin,
506 DecodebinOutputStream * output);
507 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
510 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
511 GstPadProbeInfo * info, MultiQueueSlot * slot);
512 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
513 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
514 DecodebinInputStream * input);
515 static void link_input_to_slot (DecodebinInputStream * input,
516 MultiQueueSlot * slot);
517 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
518 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
519 MultiQueueSlot * slot);
521 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
522 static void update_requested_selection (GstDecodebin3 * dbin);
524 /* FIXME: Really make all the parser stuff a self-contained helper object */
525 #include "gstdecodebin3-parse.c"
528 _gst_int_accumulator (GSignalInvocationHint * ihint,
529 GValue * return_accu, const GValue * handler_return, gpointer dummy)
531 gint res = g_value_get_int (handler_return);
533 g_value_set_int (return_accu, res);
542 gst_decodebin3_class_init (GstDecodebin3Class * klass)
544 GObjectClass *gobject_klass = (GObjectClass *) klass;
545 GstElementClass *element_class = (GstElementClass *) klass;
546 GstBinClass *bin_klass = (GstBinClass *) klass;
548 gobject_klass->dispose = gst_decodebin3_dispose;
549 gobject_klass->finalize = gst_decodebin3_finalize;
550 gobject_klass->set_property = gst_decodebin3_set_property;
551 gobject_klass->get_property = gst_decodebin3_get_property;
553 /* FIXME : ADD PROPERTIES ! */
554 g_object_class_install_property (gobject_klass, PROP_CAPS,
555 g_param_spec_boxed ("caps", "Caps",
556 "The caps on which to stop decoding. (NULL = default)",
557 GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
559 /* FIXME : ADD SIGNALS ! */
561 * GstDecodebin3::select-stream
562 * @decodebin: a #GstDecodebin3
563 * @collection: a #GstStreamCollection
564 * @stream: a #GstStream
566 * This signal is emitted whenever @decodebin needs to decide whether
567 * to expose a @stream of a given @collection.
569 * Note that the prefered way to select streams is to listen to
570 * GST_MESSAGE_STREAM_COLLECTION on the bus and send a
571 * GST_EVENT_SELECT_STREAMS with the streams the user wants.
573 * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
574 * A value of -1 (default) lets @decodebin decide what to do with the stream.
576 gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
577 g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
578 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
579 _gst_int_accumulator, NULL, NULL,
580 G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
583 * GstDecodebin3::about-to-finish:
585 * This signal is emitted when the data for the selected URI is
586 * entirely buffered and it is safe to specify another URI.
588 gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
589 g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
590 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
593 element_class->request_new_pad =
594 GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
595 element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
596 element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
597 element_class->release_pad = GST_DEBUG_FUNCPTR (gst_decodebin3_release_pad);
599 gst_element_class_add_pad_template (element_class,
600 gst_static_pad_template_get (&sink_template));
601 gst_element_class_add_pad_template (element_class,
602 gst_static_pad_template_get (&request_sink_template));
603 gst_element_class_add_pad_template (element_class,
604 gst_static_pad_template_get (&video_src_template));
605 gst_element_class_add_pad_template (element_class,
606 gst_static_pad_template_get (&audio_src_template));
607 gst_element_class_add_pad_template (element_class,
608 gst_static_pad_template_get (&text_src_template));
609 gst_element_class_add_pad_template (element_class,
610 gst_static_pad_template_get (&src_template));
612 gst_element_class_set_static_metadata (element_class,
613 "Decoder Bin 3", "Generic/Bin/Decoder",
614 "Autoplug and decode to raw media",
615 "Edward Hervey <edward@centricular.com>");
617 bin_klass->handle_message = gst_decodebin3_handle_message;
619 klass->select_stream = gst_decodebin3_select_stream;
623 gst_decodebin3_init (GstDecodebin3 * dbin)
625 /* Create main input */
626 dbin->main_input = create_new_input (dbin, TRUE);
628 dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
629 g_object_get (dbin->multiqueue, "min-interleave-time",
630 &dbin->default_mq_min_interleave, NULL);
631 dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
632 g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
633 "max-size-buffers", 0, "use-interleave", TRUE, NULL);
634 gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
636 dbin->current_group_id = GST_GROUP_ID_INVALID;
638 g_mutex_init (&dbin->factories_lock);
639 g_mutex_init (&dbin->selection_lock);
640 g_mutex_init (&dbin->input_lock);
642 dbin->caps = gst_static_caps_get (&default_raw_caps);
644 GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
648 gst_decodebin3_dispose (GObject * object)
650 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
653 if (dbin->factories) {
654 gst_plugin_feature_list_free (dbin->factories);
655 dbin->factories = NULL;
657 if (dbin->decoder_factories) {
658 g_list_free (dbin->decoder_factories);
659 dbin->decoder_factories = NULL;
661 if (dbin->decodable_factories) {
662 g_list_free (dbin->decodable_factories);
663 dbin->decodable_factories = NULL;
665 g_list_free_full (dbin->requested_selection, g_free);
666 g_list_free_full (dbin->active_selection, g_free);
667 g_list_free (dbin->to_activate);
668 g_list_free (dbin->pending_select_streams);
670 dbin->requested_selection = NULL;
671 dbin->active_selection = NULL;
672 dbin->to_activate = NULL;
673 dbin->pending_select_streams = NULL;
675 g_clear_object (&dbin->collection);
677 if (dbin->main_input) {
678 free_input (dbin, dbin->main_input);
679 dbin->main_input = NULL;
682 for (walk = dbin->other_inputs; walk; walk = next) {
683 DecodebinInput *input = walk->data;
685 next = g_list_next (walk);
687 free_input (dbin, input);
688 dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
691 G_OBJECT_CLASS (parent_class)->dispose (object);
695 gst_decodebin3_finalize (GObject * object)
697 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
699 g_mutex_clear (&dbin->factories_lock);
700 g_mutex_clear (&dbin->selection_lock);
701 g_mutex_clear (&dbin->input_lock);
703 G_OBJECT_CLASS (parent_class)->finalize (object);
707 gst_decodebin3_set_property (GObject * object, guint prop_id,
708 const GValue * value, GParamSpec * pspec)
710 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
712 /* FIXME : IMPLEMENT */
715 GST_OBJECT_LOCK (dbin);
717 gst_caps_unref (dbin->caps);
718 dbin->caps = g_value_dup_boxed (value);
719 GST_OBJECT_UNLOCK (dbin);
722 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
728 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
731 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
733 /* FIXME : IMPLEMENT */
736 GST_OBJECT_LOCK (dbin);
737 g_value_set_boxed (value, dbin->caps);
738 GST_OBJECT_UNLOCK (dbin);
741 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
747 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
748 GstCaps * caps, GstDecodebin3 * dbin)
750 GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
752 /* If it matches our target caps, expose it */
753 if (gst_caps_can_intersect (caps, dbin->caps))
759 /* This method should be called whenever a STREAM_START event
760 * comes out of a given parsebin.
761 * The caller shall replace the group_id if the function returns TRUE */
763 set_input_group_id (DecodebinInput * input, guint32 * group_id)
765 GstDecodebin3 *dbin = input->dbin;
767 if (input->group_id != *group_id) {
768 if (input->group_id != GST_GROUP_ID_INVALID)
769 GST_WARNING_OBJECT (dbin,
770 "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
771 ") on input %p ", input->group_id, *group_id, input);
772 input->group_id = *group_id;
775 if (*group_id != dbin->current_group_id) {
776 /* The input is being re-used with a different incoming stream, we do want
777 * to change/unify to this new group-id */
778 if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
779 GST_DEBUG_OBJECT (dbin,
780 "Setting current group id to %" G_GUINT32_FORMAT, *group_id);
781 dbin->current_group_id = *group_id;
783 *group_id = dbin->current_group_id;
791 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
793 GstDecodebin3 *dbin = input->dbin;
794 gboolean all_drained;
797 GST_INFO_OBJECT (dbin, "input %p drained", input);
798 input->drained = TRUE;
800 all_drained = dbin->main_input->drained;
801 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
802 DecodebinInput *data = (DecodebinInput *) tmp->data;
804 all_drained &= data->drained;
808 GST_INFO_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
809 g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
814 /* Call with INPUT_LOCK taken */
816 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
818 gboolean set_state = FALSE;
820 if (input->parsebin == NULL) {
821 input->parsebin = gst_element_factory_make ("parsebin", NULL);
822 if (input->parsebin == NULL)
824 input->parsebin = gst_object_ref (input->parsebin);
825 input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
826 input->pad_added_sigid =
827 g_signal_connect (input->parsebin, "pad-added",
828 (GCallback) parsebin_pad_added_cb, input);
829 input->pad_removed_sigid =
830 g_signal_connect (input->parsebin, "pad-removed",
831 (GCallback) parsebin_pad_removed_cb, input);
832 input->drained_sigid =
833 g_signal_connect (input->parsebin, "drained",
834 (GCallback) parsebin_drained_cb, input);
835 g_signal_connect (input->parsebin, "autoplug-continue",
836 (GCallback) parsebin_autoplug_continue_cb, dbin);
839 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
840 /* The state lock is taken so that we ensure we are the one (de)activating
841 * parsebin. We need to do this to ensure any activation taking place in
842 * parsebin (including by elements doing upstream activation) are done
843 * within the same thread. */
844 GST_STATE_LOCK (input->parsebin);
845 gst_bin_add (GST_BIN (dbin), input->parsebin);
849 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
850 input->parsebin_sink);
853 gst_element_sync_state_with_parent (input->parsebin);
854 GST_STATE_UNLOCK (input->parsebin);
862 gst_element_post_message ((GstElement *) dbin,
863 gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
868 static GstPadLinkReturn
869 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
871 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
873 gboolean pull_mode = FALSE;
874 GstPadLinkReturn res = GST_PAD_LINK_OK;
875 DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
877 g_return_val_if_fail (input, GST_PAD_LINK_REFUSED);
879 GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT, pad);
881 query = gst_query_new_scheduling ();
882 if (gst_pad_query (peer, query)
883 && gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL,
884 GST_SCHEDULING_FLAG_SEEKABLE))
886 gst_query_unref (query);
888 GST_DEBUG_OBJECT (dbin, "Upstream can do pull-based : %d", pull_mode);
890 /* If upstream *can* do pull-based, we always use a parsebin. If not, we will
891 * delay that decision to a later stage (caps/stream/collection event
892 * processing) to figure out if one is really needed or whether an identity
893 * element will be enough */
896 if (!ensure_input_parsebin (dbin, input))
897 res = GST_PAD_LINK_REFUSED;
898 else if (input->identity) {
899 GST_ERROR_OBJECT (dbin,
900 "Can't reconfigure input from push-based to pull-based");
901 res = GST_PAD_LINK_REFUSED;
909 /* Drop duration query during _input_pad_unlink */
910 static GstPadProbeReturn
911 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
912 DecodebinInput * input)
914 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
916 if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
917 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
918 if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
919 GST_LOG_OBJECT (pad, "stop forwarding query duration");
920 ret = GST_PAD_PROBE_HANDLED;
928 recalculate_group_id (GstDecodebin3 * dbin)
930 guint32 common_group_id;
933 common_group_id = dbin->main_input->group_id;
935 for (iter = dbin->other_inputs; iter; iter = iter->next) {
936 DecodebinInput *input = iter->data;
938 if (input->group_id != common_group_id)
942 GST_DEBUG_OBJECT (dbin, "Updating global group_id to %" G_GUINT32_FORMAT,
944 dbin->current_group_id = common_group_id;
947 /* CALL with INPUT LOCK */
949 reset_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
953 if (input->parsebin == NULL)
956 GST_DEBUG_OBJECT (dbin, "Resetting %" GST_PTR_FORMAT, input->parsebin);
958 GST_STATE_LOCK (dbin);
959 gst_element_set_state (input->parsebin, GST_STATE_NULL);
960 input->drained = FALSE;
961 input->group_id = GST_GROUP_ID_INVALID;
962 recalculate_group_id (dbin);
963 for (iter = dbin->input_streams; iter; iter = iter->next) {
964 DecodebinInputStream *istream = iter->data;
965 if (istream->input == input)
966 istream->saw_eos = TRUE;
968 gst_element_sync_state_with_parent (input->parsebin);
969 GST_STATE_UNLOCK (dbin);
974 gst_decodebin3_input_pad_unlink (GstPad * pad, GstPad * peer,
975 DecodebinInput * input)
977 GstDecodebin3 *dbin = input->dbin;
979 g_return_if_fail (input);
981 GST_LOG_OBJECT (dbin, "Got unlink on input pad %" GST_PTR_FORMAT, pad);
984 if (input->parsebin == NULL) {
989 if (GST_PAD_MODE (pad) == GST_PAD_MODE_PULL) {
990 GST_DEBUG_OBJECT (dbin, "Resetting parsebin since it's pull-based");
991 reset_input_parsebin (dbin, input);
998 gst_decodebin3_release_pad (GstElement * element, GstPad * pad)
1000 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1001 DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
1002 GstStreamCollection *collection = NULL;
1003 gulong probe_id = 0;
1006 g_return_if_fail (input);
1007 GST_LOG_OBJECT (dbin, "Releasing pad %" GST_PTR_FORMAT, pad);
1011 /* Clear stream-collection corresponding to current INPUT and post new
1012 * stream-collection message, if needed */
1013 if (input->collection) {
1014 gst_object_unref (input->collection);
1015 input->collection = NULL;
1018 SELECTION_LOCK (dbin);
1019 collection = get_merged_collection (dbin);
1021 SELECTION_UNLOCK (dbin);
1024 if (collection == dbin->collection) {
1025 SELECTION_UNLOCK (dbin);
1026 gst_object_unref (collection);
1030 GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
1032 if (dbin->collection)
1033 gst_object_unref (dbin->collection);
1034 dbin->collection = collection;
1035 dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1038 gst_message_new_stream_collection ((GstObject *) dbin, dbin->collection);
1040 if (input->parsebin)
1041 /* Drop duration queries that the application might be doing while this message is posted */
1042 probe_id = gst_pad_add_probe (input->parsebin_sink,
1043 GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
1044 (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
1046 SELECTION_UNLOCK (dbin);
1047 gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
1048 update_requested_selection (dbin);
1050 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
1051 if (input->parsebin) {
1052 gst_bin_remove (GST_BIN (dbin), input->parsebin);
1053 gst_element_set_state (input->parsebin, GST_STATE_NULL);
1054 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
1055 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
1056 g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
1057 gst_pad_remove_probe (input->parsebin_sink, probe_id);
1058 gst_object_unref (input->parsebin);
1059 gst_object_unref (input->parsebin_sink);
1061 input->parsebin = NULL;
1062 input->parsebin_sink = NULL;
1064 if (input->identity) {
1065 GstPad *idpad = gst_element_get_static_pad (input->identity, "src");
1066 DecodebinInputStream *stream = find_input_stream_for_pad (dbin, idpad);
1067 gst_object_unref (idpad);
1068 remove_input_stream (dbin, stream);
1069 gst_element_set_state (input->identity, GST_STATE_NULL);
1070 gst_bin_remove (GST_BIN (dbin), input->identity);
1071 gst_object_unref (input->identity);
1072 input->identity = NULL;
1075 if (!input->is_main) {
1076 dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
1077 free_input (dbin, input);
1081 INPUT_UNLOCK (dbin);
1085 /* Call with INPUT LOCK */
1087 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
1089 GST_DEBUG ("Freeing input %p", input);
1091 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
1092 gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
1093 if (input->parsebin) {
1094 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
1095 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
1096 g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
1097 gst_element_set_state (input->parsebin, GST_STATE_NULL);
1098 gst_object_unref (input->parsebin);
1099 gst_object_unref (input->parsebin_sink);
1101 if (input->identity) {
1102 GstPad *idpad = gst_element_get_static_pad (input->identity, "src");
1103 DecodebinInputStream *stream = find_input_stream_for_pad (dbin, idpad);
1104 gst_object_unref (idpad);
1105 remove_input_stream (dbin, stream);
1106 gst_element_set_state (input->identity, GST_STATE_NULL);
1107 gst_object_unref (input->identity);
1109 if (input->collection)
1110 gst_object_unref (input->collection);
1115 sink_query_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstQuery * query)
1117 DecodebinInput *input =
1118 g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1120 g_return_val_if_fail (input, FALSE);
1122 GST_DEBUG_OBJECT (sinkpad, "query %" GST_PTR_FORMAT, query);
1124 /* We accept any caps, since we will reconfigure ourself internally if the new
1125 * stream is incompatible */
1126 if (GST_QUERY_TYPE (query) == GST_QUERY_ACCEPT_CAPS) {
1127 GST_DEBUG_OBJECT (dbin, "Accepting ACCEPT_CAPS query");
1128 gst_query_set_accept_caps_result (query, TRUE);
1131 return gst_pad_query_default (sinkpad, GST_OBJECT (dbin), query);
1135 is_parsebin_required_for_input (GstDecodebin3 * dbin, DecodebinInput * input,
1136 GstCaps * newcaps, GstPad * sinkpad)
1138 gboolean parsebin_needed = TRUE;
1141 stream = gst_pad_get_stream (sinkpad);
1143 if (stream == NULL) {
1144 /* If upstream didn't provide a `GstStream` we will need to create a
1145 * parsebin to handle that stream */
1146 GST_DEBUG_OBJECT (sinkpad,
1147 "Need to create parsebin since upstream doesn't provide GstStream");
1148 } else if (gst_caps_can_intersect (newcaps, dbin->caps)) {
1149 /* If the incoming caps match decodebin3 output, no processing is needed */
1150 GST_FIXME_OBJECT (sinkpad, "parsebin not needed (matches output caps) !");
1151 parsebin_needed = FALSE;
1153 GList *decoder_list;
1154 /* If the incoming caps are compatible with a decoder, we don't need to
1155 * process it before */
1156 g_mutex_lock (&dbin->factories_lock);
1157 gst_decode_bin_update_factories_list (dbin);
1159 gst_element_factory_list_filter (dbin->decoder_factories, newcaps,
1160 GST_PAD_SINK, TRUE);
1161 g_mutex_unlock (&dbin->factories_lock);
1163 GST_FIXME_OBJECT (sinkpad, "parsebin not needed (available decoders) !");
1164 gst_plugin_feature_list_free (decoder_list);
1165 parsebin_needed = FALSE;
1169 gst_object_unref (stream);
1171 return parsebin_needed;
1175 setup_identify_for_input (GstDecodebin3 * dbin, DecodebinInput * input,
1178 GstPad *idsrc, *idsink;
1179 DecodebinInputStream *inputstream;
1181 GST_DEBUG_OBJECT (sinkpad, "Adding identity for new input stream");
1183 input->identity = gst_element_factory_make ("identity", NULL);
1184 /* We drop allocation queries due to our usage of multiqueue just
1185 * afterwards. It is just too dangerous.
1187 * If application users want to have optimal raw source <=> sink allocations
1188 * they should not use decodebin3
1190 g_object_set (input->identity, "drop-allocation", TRUE, NULL);
1191 input->identity = gst_object_ref (input->identity);
1192 idsink = gst_element_get_static_pad (input->identity, "sink");
1193 idsrc = gst_element_get_static_pad (input->identity, "src");
1194 gst_bin_add (GST_BIN (dbin), input->identity);
1196 SELECTION_LOCK (dbin);
1197 inputstream = create_input_stream (dbin, idsrc, input);
1198 /* Forward any existing GstStream directly on the input stream */
1199 inputstream->active_stream = gst_pad_get_stream (sinkpad);
1200 SELECTION_UNLOCK (dbin);
1202 gst_object_unref (idsrc);
1203 gst_object_unref (idsink);
1204 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), idsink);
1205 gst_element_sync_state_with_parent (input->identity);
1209 sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
1211 DecodebinInput *input =
1212 g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1214 g_return_val_if_fail (input, FALSE);
1216 GST_DEBUG_OBJECT (sinkpad, "event %" GST_PTR_FORMAT, event);
1218 switch (GST_EVENT_TYPE (event)) {
1219 case GST_EVENT_STREAM_START:
1221 GstQuery *q = gst_query_new_selectable ();
1223 /* Query whether upstream can handle stream selection or not */
1224 if (gst_pad_peer_query (sinkpad, q)) {
1225 gst_query_parse_selectable (q, &input->upstream_selected);
1226 GST_DEBUG_OBJECT (sinkpad, "Upstream is selectable : %d",
1227 input->upstream_selected);
1229 input->upstream_selected = FALSE;
1230 GST_DEBUG_OBJECT (sinkpad, "Upstream does not handle SELECTABLE query");
1232 gst_query_unref (q);
1234 /* FIXME : We force `decodebin3` to upstream selection mode if *any* of the
1235 inputs is. This means things might break if there's a mix */
1236 if (input->upstream_selected)
1237 dbin->upstream_selected = TRUE;
1239 /* Make sure group ids will be recalculated */
1240 input->group_id = GST_GROUP_ID_INVALID;
1241 recalculate_group_id (dbin);
1244 case GST_EVENT_STREAM_COLLECTION:
1246 GstStreamCollection *collection = NULL;
1248 gst_event_parse_stream_collection (event, &collection);
1251 handle_stream_collection (dbin, collection, input);
1252 gst_object_unref (collection);
1253 INPUT_UNLOCK (dbin);
1254 SELECTION_LOCK (dbin);
1255 /* Post the (potentially) updated collection */
1256 if (dbin->collection) {
1259 gst_message_new_stream_collection ((GstObject *) dbin,
1261 SELECTION_UNLOCK (dbin);
1262 gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
1263 update_requested_selection (dbin);
1265 SELECTION_UNLOCK (dbin);
1268 /* If we are waiting to create an identity passthrough, do it now */
1269 if (!input->parsebin && !input->identity)
1270 setup_identify_for_input (dbin, input, sinkpad);
1273 case GST_EVENT_CAPS:
1275 GstCaps *newcaps = NULL;
1277 gst_event_parse_caps (event, &newcaps);
1280 GST_DEBUG_OBJECT (sinkpad, "new caps %" GST_PTR_FORMAT, newcaps);
1282 /* No parsebin or identity present, check if we can avoid creating one */
1283 if (!input->parsebin && !input->identity) {
1284 if (is_parsebin_required_for_input (dbin, input, newcaps, sinkpad)) {
1285 GST_DEBUG_OBJECT (sinkpad, "parsebin is required for input");
1286 ensure_input_parsebin (dbin, input);
1289 GST_DEBUG_OBJECT (sinkpad,
1290 "parsebin not required. Will create identity passthrough element once we get the collection");
1294 if (input->identity) {
1295 if (is_parsebin_required_for_input (dbin, input, newcaps, sinkpad)) {
1296 GST_ERROR_OBJECT (sinkpad,
1297 "Switching from passthrough to parsebin on inputs is not supported !");
1298 gst_event_unref (event);
1301 /* Nothing else to do here */
1305 /* Check if the parsebin present can handle the new caps */
1306 g_assert (input->parsebin);
1307 GST_DEBUG_OBJECT (sinkpad,
1308 "New caps, checking if they are compatible with existing parsebin");
1309 if (!gst_pad_query_accept_caps (input->parsebin_sink, newcaps)) {
1310 GST_DEBUG_OBJECT (sinkpad,
1311 "Parsebin doesn't accept the new caps %" GST_PTR_FORMAT, newcaps);
1312 /* Reset parsebin so that it reconfigures itself for the new stream format */
1314 reset_input_parsebin (dbin, input);
1315 INPUT_UNLOCK (dbin);
1317 GST_DEBUG_OBJECT (sinkpad, "Parsebin accepts new caps");
1325 /* Chain to parent function */
1326 return gst_pad_event_default (sinkpad, GST_OBJECT (dbin), event);
1329 /* Call with INPUT_LOCK taken */
1330 static DecodebinInput *
1331 create_new_input (GstDecodebin3 * dbin, gboolean main)
1333 DecodebinInput *input;
1335 input = g_new0 (DecodebinInput, 1);
1337 input->is_main = main;
1338 input->group_id = GST_GROUP_ID_INVALID;
1340 input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
1342 gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
1343 input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
1346 input->upstream_selected = FALSE;
1347 g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
1348 gst_pad_set_event_function (input->ghost_sink,
1349 (GstPadEventFunction) sink_event_function);
1350 gst_pad_set_query_function (input->ghost_sink,
1351 (GstPadQueryFunction) sink_query_function);
1352 gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
1353 g_signal_connect (input->ghost_sink, "unlinked",
1354 (GCallback) gst_decodebin3_input_pad_unlink, input);
1356 gst_pad_set_active (input->ghost_sink, TRUE);
1357 gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
1364 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
1365 const gchar * name, const GstCaps * caps)
1367 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1368 DecodebinInput *input;
1371 /* We are ignoring names for the time being, not sure it makes any sense
1372 * within the context of decodebin3 ... */
1373 input = create_new_input (dbin, FALSE);
1376 dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1377 res = input->ghost_sink;
1378 INPUT_UNLOCK (dbin);
1384 /* Must be called with factories lock! */
1386 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1390 cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1391 if (!dbin->factories || dbin->factories_cookie != cookie) {
1393 if (dbin->factories)
1394 gst_plugin_feature_list_free (dbin->factories);
1395 if (dbin->decoder_factories)
1396 g_list_free (dbin->decoder_factories);
1397 if (dbin->decodable_factories)
1398 g_list_free (dbin->decodable_factories);
1400 gst_element_factory_list_get_elements
1401 (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1403 g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1404 dbin->factories_cookie = cookie;
1406 /* Filter decoder and other decodables */
1407 dbin->decoder_factories = NULL;
1408 dbin->decodable_factories = NULL;
1409 for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1410 GstElementFactory *fact = (GstElementFactory *) tmp->data;
1411 if (gst_element_factory_list_is_type (fact,
1412 GST_ELEMENT_FACTORY_TYPE_DECODER))
1413 dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1415 dbin->decodable_factories =
1416 g_list_append (dbin->decodable_factories, fact);
1421 /* Must be called with appropriate lock if list is a protected variable */
1422 static const gchar *
1423 stream_in_list (GList * list, const gchar * sid)
1428 for (tmp = list; tmp; tmp = tmp->next) {
1429 gchar *osid = (gchar *) tmp->data;
1430 GST_DEBUG ("Checking %s against %s", sid, osid);
1434 for (tmp = list; tmp; tmp = tmp->next) {
1435 const gchar *osid = (gchar *) tmp->data;
1436 if (!g_strcmp0 (sid, osid))
1444 stream_list_equal (GList * lista, GList * listb)
1448 if (g_list_length (lista) != g_list_length (listb))
1451 for (tmp = lista; tmp; tmp = tmp->next) {
1452 gchar *osid = tmp->data;
1453 if (!stream_in_list (listb, osid))
1461 update_requested_selection (GstDecodebin3 * dbin)
1465 gboolean all_user_selected = TRUE;
1466 GstStreamType used_types = 0;
1467 GstStreamCollection *collection;
1469 /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1470 * the switch handler will take care of the pending selection */
1471 SELECTION_LOCK (dbin);
1472 if (dbin->pending_select_streams) {
1473 GST_DEBUG_OBJECT (dbin,
1474 "No need to create pending selection, SELECT_STREAMS underway");
1478 collection = dbin->collection;
1479 if (G_UNLIKELY (collection == NULL)) {
1480 GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1483 nb = gst_stream_collection_get_size (collection);
1485 /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1486 GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1488 /* 3. If not, check if we already have some of the streams in the
1489 * existing active/requested selection */
1490 for (i = 0; i < nb; i++) {
1491 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1492 const gchar *sid = gst_stream_get_stream_id (stream);
1494 /* Fire select-stream signal to see if outside components want to
1495 * hint at which streams should be selected */
1496 g_signal_emit (G_OBJECT (dbin),
1497 gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1499 GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1502 all_user_selected = FALSE;
1503 if (request == 1 || (request == -1
1504 && (stream_in_list (dbin->requested_selection, sid)
1505 || stream_in_list (dbin->active_selection, sid)))) {
1506 GstStreamType curtype = gst_stream_get_stream_type (stream);
1508 GST_DEBUG_OBJECT (dbin,
1509 "Using stream requested by 'select-stream' signal : %s", sid);
1511 GST_DEBUG_OBJECT (dbin,
1512 "Re-using stream already present in requested or active selection : %s",
1514 tmp = g_list_append (tmp, (gchar *) sid);
1515 used_types |= curtype;
1519 /* 4. If the user didn't explicitly selected all streams, match one stream of each type */
1520 if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) {
1521 for (i = 0; i < nb; i++) {
1522 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1523 GstStreamType curtype = gst_stream_get_stream_type (stream);
1524 if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) {
1525 const gchar *sid = gst_stream_get_stream_id (stream);
1526 GST_DEBUG_OBJECT (dbin,
1527 "Automatically selecting stream '%s' of type %s", sid,
1528 gst_stream_type_get_name (curtype));
1529 tmp = g_list_append (tmp, (gchar *) sid);
1530 used_types |= curtype;
1536 if (stream_list_equal (tmp, dbin->requested_selection)) {
1537 /* If the selection is equal, there is nothign to do */
1538 GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection");
1544 /* Finally set the requested selection */
1545 if (dbin->requested_selection) {
1546 GST_FIXME_OBJECT (dbin,
1547 "Replacing non-NULL requested_selection, what should we do ??");
1548 g_list_free_full (dbin->requested_selection, g_free);
1550 dbin->requested_selection =
1551 g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1552 dbin->selection_updated = TRUE;
1555 SELECTION_UNLOCK (dbin);
1559 * GCompareFunc to use with lists of GstStream.
1560 * Sorts GstStreams by stream type and SELECT flag and stream-id
1561 * First video, then audio, then others.
1563 * Return: negative if a<b, 0 if a==b, positive if a>b
1566 sort_streams (GstStream * sa, GstStream * sb)
1568 GstStreamType typea, typeb;
1569 GstStreamFlags flaga, flagb;
1570 const gchar *ida, *idb;
1573 typea = gst_stream_get_stream_type (sa);
1574 typeb = gst_stream_get_stream_type (sb);
1576 GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1577 gst_stream_get_stream_id (sb));
1579 /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1580 if (typea != typeb) {
1581 if (typea & GST_STREAM_TYPE_VIDEO)
1583 else if (typea & GST_STREAM_TYPE_AUDIO)
1584 ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1585 else if (typea & GST_STREAM_TYPE_TEXT)
1586 ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1587 && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1588 else if (typea & GST_STREAM_TYPE_CONTAINER)
1589 ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1594 GST_LOG ("Sort by stream-type: %d", ret);
1599 /* Sort by SELECT flag, if stream type is same. */
1600 flaga = gst_stream_get_stream_flags (sa);
1601 flagb = gst_stream_get_stream_flags (sb);
1604 (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1605 -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1608 GST_LOG ("Sort by SELECT flag: %d", ret);
1612 /* Sort by stream-id, if otherwise the same. */
1613 ida = gst_stream_get_stream_id (sa);
1614 idb = gst_stream_get_stream_id (sb);
1615 ret = g_strcmp0 (ida, idb);
1617 GST_LOG ("Sort by stream-id: %d", ret);
1622 /* Call with INPUT_LOCK taken */
1623 static GstStreamCollection *
1624 get_merged_collection (GstDecodebin3 * dbin)
1626 gboolean needs_merge = FALSE;
1627 GstStreamCollection *res = NULL;
1629 GList *unsorted_streams = NULL;
1632 /* First check if we need to do a merge or just return the only collection */
1633 res = dbin->main_input->collection;
1635 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1636 DecodebinInput *input = (DecodebinInput *) tmp->data;
1637 if (input->collection) {
1642 res = input->collection;
1647 GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1648 return res ? gst_object_ref (res) : NULL;
1651 /* We really need to create a new collection */
1652 /* FIXME : Some numbering scheme maybe ?? */
1653 res = gst_stream_collection_new ("decodebin3");
1654 if (dbin->main_input->collection) {
1655 nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1656 GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1657 for (i = 0; i < nb_stream; i++) {
1659 gst_stream_collection_get_stream (dbin->main_input->collection, i);
1660 unsorted_streams = g_list_append (unsorted_streams, stream);
1664 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1665 DecodebinInput *input = (DecodebinInput *) tmp->data;
1666 GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1668 if (input->collection) {
1669 nb_stream = gst_stream_collection_get_size (input->collection);
1670 GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1671 for (i = 0; i < nb_stream; i++) {
1673 gst_stream_collection_get_stream (input->collection, i);
1674 /* Only add if not already present in the list */
1675 if (!g_list_find (unsorted_streams, stream))
1676 unsorted_streams = g_list_append (unsorted_streams, stream);
1681 /* re-order streams : video, then audio, then others */
1683 g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1684 for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1685 GstStream *stream = (GstStream *) tmp->data;
1686 GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1687 gst_stream_get_stream_id (stream));
1688 gst_stream_collection_add_stream (res, gst_object_ref (stream));
1691 if (unsorted_streams)
1692 g_list_free (unsorted_streams);
1697 /* Call with INPUT_LOCK taken */
1698 static DecodebinInput *
1699 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1701 DecodebinInput *input = NULL;
1702 GstElement *parent = gst_object_ref (child);
1706 GstElement *next_parent;
1708 GST_DEBUG_OBJECT (dbin, "parent %s",
1709 parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1711 if (parent == dbin->main_input->parsebin) {
1712 input = dbin->main_input;
1715 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1716 DecodebinInput *cur = (DecodebinInput *) tmp->data;
1717 if (parent == cur->parsebin) {
1722 next_parent = (GstElement *) gst_element_get_parent (parent);
1723 gst_object_unref (parent);
1724 parent = next_parent;
1726 } while (parent && parent != (GstElement *) dbin);
1729 gst_object_unref (parent);
1734 static const gchar *
1735 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1739 if (dbin->collection == NULL)
1741 len = gst_stream_collection_get_size (dbin->collection);
1742 for (i = 0; i < len; i++) {
1743 GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1744 const gchar *osid = gst_stream_get_stream_id (stream);
1745 if (!g_strcmp0 (sid, osid))
1752 /* Call with INPUT_LOCK taken */
1754 handle_stream_collection (GstDecodebin3 * dbin,
1755 GstStreamCollection * collection, DecodebinInput * input)
1757 #ifndef GST_DISABLE_GST_DEBUG
1758 const gchar *upstream_id;
1762 GST_DEBUG_OBJECT (dbin,
1763 "Couldn't find corresponding input, most likely shutting down");
1767 /* Replace collection in input */
1768 if (input->collection)
1769 gst_object_unref (input->collection);
1770 input->collection = gst_object_ref (collection);
1771 GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1774 /* Merge collection if needed */
1775 collection = get_merged_collection (dbin);
1777 #ifndef GST_DISABLE_GST_DEBUG
1778 /* Just some debugging */
1779 upstream_id = gst_stream_collection_get_upstream_id (collection);
1780 GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1781 GST_DEBUG ("From input %p", input);
1782 GST_DEBUG (" %d streams", gst_stream_collection_get_size (collection));
1783 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1784 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1785 GstTagList *taglist;
1788 GST_DEBUG (" Stream '%s'", gst_stream_get_stream_id (stream));
1789 GST_DEBUG (" type : %s",
1790 gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1791 GST_DEBUG (" flags : 0x%x", gst_stream_get_stream_flags (stream));
1792 taglist = gst_stream_get_tags (stream);
1793 GST_DEBUG (" tags : %" GST_PTR_FORMAT, taglist);
1794 caps = gst_stream_get_caps (stream);
1795 GST_DEBUG (" caps : %" GST_PTR_FORMAT, caps);
1797 gst_tag_list_unref (taglist);
1799 gst_caps_unref (caps);
1803 /* Store collection for later usage */
1804 SELECTION_LOCK (dbin);
1805 if (dbin->collection == NULL) {
1806 dbin->collection = collection;
1808 /* We need to check who emitted this collection (the owner).
1809 * If we already had a collection from that user, this one is an update,
1810 * that is to say that we need to figure out how we are going to re-use
1811 * the streams/slot */
1812 GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1813 /* FIXME : When do we switch from pending collection to active collection ?
1814 * When all streams from active collection are drained in multiqueue output ? */
1815 gst_object_unref (dbin->collection);
1816 dbin->collection = collection;
1817 /* dbin->pending_collection = */
1818 /* g_list_append (dbin->pending_collection, collection); */
1820 dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1821 SELECTION_UNLOCK (dbin);
1824 /* Must be called with the selection lock taken */
1826 gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
1828 GstClockTime max_latency = GST_CLOCK_TIME_NONE;
1831 GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
1832 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1833 DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1834 if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
1835 if (max_latency == GST_CLOCK_TIME_NONE
1836 || out->decoder_latency > max_latency)
1837 max_latency = out->decoder_latency;
1840 GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
1841 GST_TIME_ARGS (max_latency));
1843 if (!GST_CLOCK_TIME_IS_VALID (max_latency))
1846 /* Make sure we keep an extra overhead */
1847 max_latency += 100 * GST_MSECOND;
1848 if (max_latency == dbin->current_mq_min_interleave)
1851 dbin->current_mq_min_interleave = max_latency;
1852 GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
1853 GST_TIME_ARGS (dbin->current_mq_min_interleave));
1854 g_object_set (dbin->multiqueue, "min-interleave-time",
1855 dbin->current_mq_min_interleave, NULL);
1859 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1861 GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1862 gboolean posting_collection = FALSE;
1864 GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1866 switch (GST_MESSAGE_TYPE (message)) {
1867 case GST_MESSAGE_STREAM_COLLECTION:
1869 GstStreamCollection *collection = NULL;
1870 DecodebinInput *input;
1874 find_message_parsebin (dbin,
1875 (GstElement *) GST_MESSAGE_SRC (message));
1876 if (input == NULL) {
1877 GST_DEBUG_OBJECT (dbin,
1878 "Couldn't find corresponding input, most likely shutting down");
1879 INPUT_UNLOCK (dbin);
1882 if (input->upstream_selected) {
1883 GST_DEBUG_OBJECT (dbin,
1884 "Upstream handles selection, not using/forwarding collection");
1885 INPUT_UNLOCK (dbin);
1888 gst_message_parse_stream_collection (message, &collection);
1890 handle_stream_collection (dbin, collection, input);
1891 posting_collection = TRUE;
1893 INPUT_UNLOCK (dbin);
1895 SELECTION_LOCK (dbin);
1896 if (dbin->collection) {
1897 /* Replace collection message, we most likely aggregated it */
1898 GstMessage *new_msg;
1900 gst_message_new_stream_collection ((GstObject *) dbin,
1902 gst_message_unref (message);
1905 SELECTION_UNLOCK (dbin);
1908 gst_object_unref (collection);
1911 case GST_MESSAGE_LATENCY:
1914 /* Check if this is from one of our decoders */
1915 SELECTION_LOCK (dbin);
1916 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1917 DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1918 if (out->decoder == (GstElement *) GST_MESSAGE_SRC (message)) {
1919 GstClockTime min, max;
1920 if (GST_IS_VIDEO_DECODER (out->decoder)) {
1921 gst_video_decoder_get_latency (GST_VIDEO_DECODER (out->decoder),
1923 GST_DEBUG_OBJECT (dbin,
1924 "Got latency update from one of our decoders. min: %"
1925 GST_TIME_FORMAT " max: %" GST_TIME_FORMAT, GST_TIME_ARGS (min),
1926 GST_TIME_ARGS (max));
1927 out->decoder_latency = min;
1928 /* Trigger recalculation */
1929 gst_decodebin3_update_min_interleave (dbin);
1934 SELECTION_UNLOCK (dbin);
1940 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1942 if (posting_collection) {
1943 /* Figure out a selection for that collection */
1944 update_requested_selection (dbin);
1951 GST_DEBUG_OBJECT (bin, "dropping message");
1952 gst_message_unref (message);
1956 static DecodebinOutputStream *
1957 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1960 GstStreamType stype = gst_stream_get_stream_type (stream);
1962 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1963 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1964 if (output->type == stype && output->slot && output->slot->active_stream) {
1965 GstStream *tstream = output->slot->active_stream;
1966 if (!stream_in_list (dbin->requested_selection,
1967 (gchar *) gst_stream_get_stream_id (tstream))) {
1976 /* Give a certain slot, figure out if it should be linked to an
1978 * CALL WITH SELECTION LOCK TAKEN !*/
1979 static DecodebinOutputStream *
1980 get_output_for_slot (MultiQueueSlot * slot)
1982 GstDecodebin3 *dbin = slot->dbin;
1983 DecodebinOutputStream *output = NULL;
1984 const gchar *stream_id;
1986 gchar *id_in_list = NULL;
1988 /* If we already have a configured output, just use it */
1989 if (slot->output != NULL)
1990 return slot->output;
1995 * This method needs to be split into multiple parts
1997 * 1) Figure out whether stream should be exposed or not
1998 * This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1999 * in the default stream attribution
2001 * 2) Figure out whether an output stream should be created, whether
2002 * we can re-use the output stream already linked to the slot, or
2003 * whether we need to get re-assigned another (currently used) output
2007 stream_id = gst_stream_get_stream_id (slot->active_stream);
2008 caps = gst_stream_get_caps (slot->active_stream);
2009 GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
2010 gst_caps_unref (caps);
2012 /* 0. Emit autoplug-continue signal for pending caps ? */
2013 GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
2015 /* 1. if in EXPOSE_ALL_MODE, just accept */
2016 GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
2019 /* FIXME : The idea around this was to avoid activating a stream for
2020 * which we have no decoder. Unfortunately it is way too
2021 * expensive. Need to figure out a better solution */
2022 /* 2. Is there a potential decoder (if one is required) */
2023 if (!gst_caps_can_intersect (caps, dbin->caps)
2024 && !have_factory (dbin, (GstCaps *) caps,
2025 GST_ELEMENT_FACTORY_TYPE_DECODER)) {
2026 GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
2028 SELECTION_UNLOCK (dbin);
2029 gst_element_post_message (GST_ELEMENT_CAST (dbin),
2030 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2031 SELECTION_LOCK (dbin);
2036 /* 3. In default mode check if we should expose */
2037 id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
2038 if (id_in_list || dbin->upstream_selected) {
2039 /* Check if we can steal an existing output stream we could re-use.
2041 * * an output stream whose slot->stream is not in requested
2042 * * and is of the same type as this stream
2044 output = find_free_compatible_output (dbin, slot->active_stream);
2046 /* Move this output from its current slot to this slot */
2048 g_list_append (dbin->to_activate, (gchar *) stream_id);
2049 dbin->requested_selection =
2050 g_list_remove (dbin->requested_selection, id_in_list);
2051 g_free (id_in_list);
2052 SELECTION_UNLOCK (dbin);
2053 gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2054 (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
2055 SELECTION_LOCK (dbin);
2059 output = create_output_stream (dbin, slot->type);
2060 output->slot = slot;
2061 GST_DEBUG ("Linking slot %p to new output %p", slot, output);
2062 slot->output = output;
2063 GST_DEBUG ("Adding '%s' to active_selection", stream_id);
2064 dbin->active_selection =
2065 g_list_append (dbin->active_selection, (gchar *) g_strdup (stream_id));
2067 GST_DEBUG ("Not creating any output for slot %p", slot);
2072 /* Returns SELECTED_STREAMS message if active_selection is equal to
2073 * requested_selection, else NULL.
2074 * Must be called with LOCK taken */
2076 is_selection_done (GstDecodebin3 * dbin)
2081 if (!dbin->selection_updated)
2084 GST_LOG_OBJECT (dbin, "Checking");
2086 if (dbin->to_activate != NULL) {
2087 GST_DEBUG ("Still have streams to activate");
2090 for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
2091 GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
2092 if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
2093 GST_DEBUG ("Not in active selection, returning");
2098 GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
2100 /* We are completely active */
2101 msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
2102 if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) {
2103 gst_message_set_seqnum (msg, dbin->select_streams_seqnum);
2105 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2106 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2108 GST_DEBUG_OBJECT (dbin, "Adding stream %s",
2109 gst_stream_get_stream_id (output->slot->active_stream));
2111 gst_message_streams_selected_add (msg, output->slot->active_stream);
2113 GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
2115 dbin->selection_updated = FALSE;
2119 /* Must be called with SELECTION_LOCK taken */
2121 check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev)
2123 gboolean all_drained = TRUE;
2126 GST_DEBUG_OBJECT (dbin, "check slot for eos");
2128 for (iter = dbin->slots; iter; iter = iter->next) {
2129 MultiQueueSlot *slot = iter->data;
2134 if (slot->is_drained) {
2135 GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
2139 all_drained = FALSE;
2143 /* Also check with the inputs, data might be pending */
2145 all_drained = all_inputs_are_eos (dbin);
2148 GST_DEBUG_OBJECT (dbin,
2149 "All active slots are drained, and no pending input, push EOS");
2151 for (iter = dbin->input_streams; iter; iter = iter->next) {
2152 DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
2153 GstPad *peer = gst_pad_get_peer (input->srcpad);
2155 /* Send EOS to all slots */
2157 GstEvent *stream_start, *eos;
2160 gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
2162 /* First forward a custom STREAM_START event to reset the EOS status (if any) */
2165 GstEvent *custom_stream_start = gst_event_copy (stream_start);
2166 gst_event_unref (stream_start);
2167 s = (GstStructure *) gst_event_get_structure (custom_stream_start);
2168 gst_structure_set (s, "decodebin3-flushing-stream-start",
2169 G_TYPE_BOOLEAN, TRUE, NULL);
2170 gst_pad_send_event (peer, custom_stream_start);
2173 eos = gst_event_new_eos ();
2174 gst_event_set_seqnum (eos, gst_event_get_seqnum (ev));
2175 gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
2176 CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
2178 gst_pad_send_event (peer, eos);
2179 gst_object_unref (peer);
2181 GST_DEBUG_OBJECT (dbin, "no output");
2186 static GstPadProbeReturn
2187 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
2188 MultiQueueSlot * slot)
2190 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2191 GstDecodebin3 *dbin = slot->dbin;
2193 if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
2194 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2196 GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
2197 switch (GST_EVENT_TYPE (ev)) {
2198 case GST_EVENT_STREAM_START:
2200 GstStream *stream = NULL;
2201 const GstStructure *s = gst_event_get_structure (ev);
2203 /* Drop STREAM_START events used to cleanup multiqueue */
2205 && gst_structure_has_field (s,
2206 "decodebin3-flushing-stream-start")) {
2207 ret = GST_PAD_PROBE_HANDLED;
2208 gst_event_unref (ev);
2212 gst_event_parse_stream (ev, &stream);
2213 if (stream == NULL) {
2214 GST_ERROR_OBJECT (pad,
2215 "Got a STREAM_START event without a GstStream");
2218 slot->is_drained = FALSE;
2219 GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
2220 gst_stream_get_stream_id (stream));
2221 if (slot->active_stream == NULL) {
2222 slot->active_stream = stream;
2223 } else if (slot->active_stream != stream) {
2224 GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
2225 gst_stream_get_stream_id (slot->active_stream),
2226 gst_stream_get_stream_id (stream));
2227 gst_object_unref (slot->active_stream);
2228 slot->active_stream = stream;
2230 gst_object_unref (stream);
2231 #if 0 /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
2233 gboolean is_active, is_requested;
2234 /* Quick check to see if we're in the current selection */
2235 /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
2236 SELECTION_LOCK (dbin);
2237 GST_DEBUG_OBJECT (dbin, "Checking active selection");
2238 is_active = stream_in_list (dbin->active_selection, stream_id);
2239 GST_DEBUG_OBJECT (dbin, "Checking requested selection");
2240 is_requested = stream_in_list (dbin->requested_selection, stream_id);
2241 SELECTION_UNLOCK (dbin);
2243 GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
2246 GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
2248 else if (slot->output) {
2249 GST_DEBUG_OBJECT (pad,
2250 "Slot needs to be deactivated ? It's no longer in requested selection");
2251 } else if (!is_active)
2252 GST_DEBUG_OBJECT (pad,
2253 "Slot in neither active nor requested selection");
2258 case GST_EVENT_CAPS:
2260 /* Configure the output slot if needed */
2261 DecodebinOutputStream *output;
2262 GstMessage *msg = NULL;
2263 SELECTION_LOCK (dbin);
2264 output = get_output_for_slot (slot);
2266 reconfigure_output_stream (output, slot);
2267 msg = is_selection_done (dbin);
2269 SELECTION_UNLOCK (dbin);
2271 gst_element_post_message ((GstElement *) slot->dbin, msg);
2276 gboolean was_drained = slot->is_drained;
2277 slot->is_drained = TRUE;
2279 /* Custom EOS handling first */
2280 if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2281 CUSTOM_EOS_QUARK)) {
2282 /* remove custom-eos */
2283 ev = gst_event_make_writable (ev);
2284 GST_PAD_PROBE_INFO_DATA (info) = ev;
2285 gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
2286 CUSTOM_EOS_QUARK, NULL, NULL);
2288 GST_LOG_OBJECT (pad, "Received custom EOS");
2289 ret = GST_PAD_PROBE_HANDLED;
2290 SELECTION_LOCK (dbin);
2291 if (slot->input == NULL) {
2292 GST_DEBUG_OBJECT (pad,
2293 "Got custom-eos from null input stream, remove output stream");
2294 /* Remove the output */
2296 DecodebinOutputStream *output = slot->output;
2297 dbin->output_streams =
2298 g_list_remove (dbin->output_streams, output);
2299 free_output_stream (dbin, output);
2300 /* Reacalculate min interleave */
2301 gst_decodebin3_update_min_interleave (dbin);
2304 dbin->slots = g_list_remove (dbin->slots, slot);
2305 free_multiqueue_slot_async (dbin, slot);
2306 ret = GST_PAD_PROBE_REMOVE;
2307 } else if (!was_drained) {
2308 check_all_slot_for_eos (dbin, ev);
2310 if (ret == GST_PAD_PROBE_HANDLED)
2311 gst_event_unref (ev);
2312 SELECTION_UNLOCK (dbin);
2316 GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
2318 if (slot->input == NULL) {
2320 GST_DEBUG_OBJECT (pad,
2321 "last EOS for input, forwarding and removing slot");
2322 peer = gst_pad_get_peer (pad);
2324 gst_pad_send_event (peer, gst_event_ref (ev));
2325 gst_object_unref (peer);
2327 SELECTION_LOCK (dbin);
2328 /* FIXME : Shouldn't we try to re-assign the output instead of just
2330 /* Remove the output */
2332 DecodebinOutputStream *output = slot->output;
2333 dbin->output_streams = g_list_remove (dbin->output_streams, output);
2334 free_output_stream (dbin, output);
2337 dbin->slots = g_list_remove (dbin->slots, slot);
2338 SELECTION_UNLOCK (dbin);
2340 /* FIXME: Removing the slot is async, which means actually
2341 * unlinking the pad is async. Other things like stream-start
2342 * might flow through this (now unprobed) link before it actually
2344 free_multiqueue_slot_async (dbin, slot);
2345 ret = GST_PAD_PROBE_REMOVE;
2346 } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2347 CUSTOM_FINAL_EOS_QUARK)) {
2348 GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
2350 GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
2351 /* drop current event as eos will be sent in check_all_slot_for_eos
2352 * when all output streams are also eos */
2353 ret = GST_PAD_PROBE_DROP;
2354 SELECTION_LOCK (dbin);
2355 check_all_slot_for_eos (dbin, ev);
2356 SELECTION_UNLOCK (dbin);
2363 } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
2364 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
2365 switch (GST_QUERY_TYPE (query)) {
2366 case GST_QUERY_CAPS:
2368 GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
2369 gst_query_set_caps_result (query, GST_CAPS_ANY);
2370 ret = GST_PAD_PROBE_HANDLED;
2374 case GST_QUERY_ACCEPT_CAPS:
2376 GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
2377 /* If the current decoder doesn't accept caps, we'll reconfigure
2378 * on the actual caps event. So accept any caps. */
2379 gst_query_set_accept_caps_result (query, TRUE);
2380 ret = GST_PAD_PROBE_HANDLED;
2390 /* Create a new multiqueue slot for the given type
2392 * It is up to the caller to know whether that slot is needed or not
2393 * (and release it when no longer needed) */
2394 static MultiQueueSlot *
2395 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
2397 MultiQueueSlot *slot;
2398 GstIterator *it = NULL;
2399 GValue item = { 0, };
2401 GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
2402 gst_stream_type_get_name (type));
2403 slot = g_new0 (MultiQueueSlot, 1);
2406 slot->id = dbin->slot_id++;
2409 slot->sink_pad = gst_element_request_pad_simple (dbin->multiqueue, "sink_%u");
2410 if (slot->sink_pad == NULL)
2413 it = gst_pad_iterate_internal_links (slot->sink_pad);
2414 if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
2415 || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
2416 GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
2417 GST_DEBUG_PAD_NAME (slot->src_pad));
2420 gst_iterator_free (it);
2421 g_value_reset (&item);
2423 g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
2425 /* Add event probe */
2427 gst_pad_add_probe (slot->src_pad,
2428 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2429 (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
2431 GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
2432 GST_DEBUG_PAD_NAME (slot->src_pad));
2434 dbin->slots = g_list_append (dbin->slots, slot);
2442 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2448 /* Must be called with SELECTION_LOCK */
2449 static MultiQueueSlot *
2450 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
2453 MultiQueueSlot *empty_slot = NULL;
2454 GstStreamType input_type = 0;
2455 gchar *stream_id = NULL;
2457 GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
2458 input, input->active_stream,
2460 active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
2462 if (input->active_stream) {
2463 input_type = gst_stream_get_stream_type (input->active_stream);
2464 stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
2467 /* Go over existing slots and check if there is already one for it */
2468 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2469 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2470 /* Already used input, return that one */
2471 if (slot->input == input) {
2472 GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
2477 /* Go amongst all unused slots of the right type and try to find a candidate */
2478 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2479 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2480 if (slot->input == NULL && input_type == slot->type) {
2481 /* Remember this empty slot for later */
2483 /* Check if available slot is of the same stream_id */
2484 GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2485 slot->id, slot->active_stream);
2486 if (stream_id && slot->active_stream) {
2488 (gchar *) gst_stream_get_stream_id (slot->active_stream);
2489 GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2490 ostream_id, stream_id);
2491 if (!g_strcmp0 (stream_id, ostream_id))
2498 GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2499 empty_slot->input = input;
2504 return create_new_slot (dbin, input_type);
2510 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2512 if (slot->input != NULL && slot->input != input) {
2513 GST_ERROR_OBJECT (slot->dbin,
2514 "Trying to link input to an already used slot");
2517 gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2518 slot->pending_stream = input->active_stream;
2519 slot->input = input;
2524 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2525 GstElementFactoryListType ftype)
2527 gboolean ret = FALSE;
2530 g_mutex_lock (&dbin->factories_lock);
2531 gst_decode_bin_update_factories_list (dbin);
2532 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2534 gst_element_factory_list_filter (dbin->decoder_factories,
2535 caps, GST_PAD_SINK, TRUE);
2538 gst_element_factory_list_filter (dbin->decodable_factories,
2539 caps, GST_PAD_SINK, TRUE);
2540 g_mutex_unlock (&dbin->factories_lock);
2544 gst_plugin_feature_list_free (res);
2552 create_decoder_factory_list (GstDecodebin3 * dbin, GstCaps * caps)
2556 g_mutex_lock (&dbin->factories_lock);
2557 gst_decode_bin_update_factories_list (dbin);
2558 res = gst_element_factory_list_filter (dbin->decoder_factories,
2559 caps, GST_PAD_SINK, TRUE);
2560 g_mutex_unlock (&dbin->factories_lock);
2564 static GstPadProbeReturn
2565 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2566 DecodebinOutputStream * output)
2568 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2569 /* If we have a keyframe, remove the probe and let all data through */
2570 /* FIXME : HANDLE HEADER BUFFER ?? */
2571 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2572 GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2573 GST_DEBUG_OBJECT (pad,
2574 "Buffer is keyframe or header, letting through and removing probe");
2575 output->drop_probe_id = 0;
2576 return GST_PAD_PROBE_REMOVE;
2578 GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2579 return GST_PAD_PROBE_DROP;
2583 reconfigure_output_stream (DecodebinOutputStream * output,
2584 MultiQueueSlot * slot)
2586 GstDecodebin3 *dbin = output->dbin;
2587 GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2588 gboolean needs_decoder;
2590 needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2592 GST_DEBUG_OBJECT (dbin,
2593 "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2596 /* FIXME : Maybe make the output un-hook itself automatically ? */
2597 if (output->slot != NULL && output->slot != slot) {
2598 GST_WARNING_OBJECT (dbin,
2599 "Output still linked to another slot (%p)", output->slot);
2600 gst_caps_unref (new_caps);
2604 /* Check if existing config is reusable as-is by checking if
2605 * the existing decoder accepts the new caps, if not delete
2606 * it and create a new one */
2607 if (output->decoder) {
2608 gboolean can_reuse_decoder;
2610 if (needs_decoder) {
2612 gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2614 can_reuse_decoder = FALSE;
2616 if (can_reuse_decoder) {
2617 if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2618 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2619 output->drop_probe_id =
2620 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2621 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2623 GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2624 if (output->linked == FALSE) {
2625 gst_pad_link_full (slot->src_pad, output->decoder_sink,
2626 GST_PAD_LINK_CHECK_NOTHING);
2627 output->linked = TRUE;
2629 gst_caps_unref (new_caps);
2633 GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2636 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2637 output->linked = FALSE;
2638 if (output->drop_probe_id) {
2639 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2640 output->drop_probe_id = 0;
2643 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2644 GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2645 gst_caps_unref (new_caps);
2649 gst_element_set_locked_state (output->decoder, TRUE);
2650 gst_element_set_state (output->decoder, GST_STATE_NULL);
2652 gst_bin_remove ((GstBin *) dbin, output->decoder);
2653 output->decoder = NULL;
2654 output->decoder_latency = GST_CLOCK_TIME_NONE;
2655 } else if (output->linked) {
2656 /* Otherwise if we have no decoder yet but the output is linked make
2657 * sure that the ghost pad is really unlinked in case no decoder was
2658 * needed previously */
2659 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2660 GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
2661 gst_caps_unref (new_caps);
2666 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2667 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2669 /* If a decoder is required, create one */
2670 if (needs_decoder) {
2671 GList *factories, *next_factory;
2673 factories = next_factory = create_decoder_factory_list (dbin, new_caps);
2674 while (!output->decoder) {
2675 gboolean decoder_failed = FALSE;
2677 /* If we don't have a decoder yet, instantiate one */
2679 output->decoder = gst_element_factory_create ((GstElementFactory *)
2680 next_factory->data, NULL);
2681 GST_DEBUG ("Created decoder '%s'", GST_ELEMENT_NAME (output->decoder));
2683 GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
2686 if (output->decoder == NULL) {
2689 SELECTION_UNLOCK (dbin);
2690 /* FIXME : Should we be smarter if there's a missing decoder ?
2691 * Should we deactivate that stream ? */
2692 caps = gst_stream_get_caps (slot->active_stream);
2693 gst_element_post_message (GST_ELEMENT_CAST (dbin),
2694 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2695 gst_caps_unref (caps);
2696 SELECTION_LOCK (dbin);
2699 if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2700 GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2703 output->decoder_sink =
2704 gst_element_get_static_pad (output->decoder, "sink");
2705 output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2706 if (output->type & GST_STREAM_TYPE_VIDEO) {
2707 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2708 output->drop_probe_id =
2709 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2710 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2712 if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2713 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2714 GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2715 GST_DEBUG_PAD_NAME (output->decoder_sink));
2718 if (gst_element_set_state (output->decoder,
2719 GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
2720 GST_DEBUG_OBJECT (dbin,
2721 "Decoder '%s' failed to reach READY state, trying the next type",
2722 GST_ELEMENT_NAME (output->decoder));
2723 decoder_failed = TRUE;
2725 if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
2726 GST_DEBUG_OBJECT (dbin,
2727 "Decoder '%s' did not accept the caps, trying the next type",
2728 GST_ELEMENT_NAME (output->decoder));
2729 decoder_failed = TRUE;
2731 if (decoder_failed) {
2732 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2733 if (output->drop_probe_id) {
2734 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2735 output->drop_probe_id = 0;
2738 gst_element_set_locked_state (output->decoder, TRUE);
2739 gst_element_set_state (output->decoder, GST_STATE_NULL);
2741 gst_bin_remove ((GstBin *) dbin, output->decoder);
2742 output->decoder = NULL;
2744 next_factory = next_factory->next;
2746 gst_plugin_feature_list_free (factories);
2748 output->decoder_src = gst_object_ref (slot->src_pad);
2749 output->decoder_sink = NULL;
2751 gst_caps_unref (new_caps);
2753 output->linked = TRUE;
2754 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2755 output->decoder_src)) {
2756 GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2759 if (output->src_exposed == FALSE) {
2760 GstEvent *stream_start;
2762 stream_start = gst_pad_get_sticky_event (slot->src_pad,
2763 GST_EVENT_STREAM_START, 0);
2765 /* Ensure GstStream is accesiable from pad-added callback */
2767 gst_pad_store_sticky_event (output->src_pad, stream_start);
2768 gst_event_unref (stream_start);
2770 GST_WARNING_OBJECT (slot->src_pad,
2771 "Pad has no stored stream-start event");
2774 output->src_exposed = TRUE;
2775 gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2778 if (output->decoder)
2779 gst_element_sync_state_with_parent (output->decoder);
2781 output->slot = slot;
2786 GST_DEBUG_OBJECT (dbin, "Cleanup");
2787 if (output->decoder_sink) {
2788 gst_object_unref (output->decoder_sink);
2789 output->decoder_sink = NULL;
2791 if (output->decoder_src) {
2792 gst_object_unref (output->decoder_src);
2793 output->decoder_src = NULL;
2795 if (output->decoder) {
2796 gst_element_set_state (output->decoder, GST_STATE_NULL);
2797 gst_bin_remove ((GstBin *) dbin, output->decoder);
2798 output->decoder = NULL;
2803 static GstPadProbeReturn
2804 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2806 GstMessage *msg = NULL;
2807 DecodebinOutputStream *output;
2809 SELECTION_LOCK (slot->dbin);
2810 output = get_output_for_slot (slot);
2812 GST_DEBUG_OBJECT (pad, "output : %p", output);
2815 reconfigure_output_stream (output, slot);
2816 msg = is_selection_done (slot->dbin);
2818 SELECTION_UNLOCK (slot->dbin);
2820 gst_element_post_message ((GstElement *) slot->dbin, msg);
2822 return GST_PAD_PROBE_REMOVE;
2825 static MultiQueueSlot *
2826 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2830 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2831 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2832 const gchar *stream_id;
2833 if (slot->active_stream) {
2834 stream_id = gst_stream_get_stream_id (slot->active_stream);
2835 if (!g_strcmp0 (sid, stream_id))
2838 if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2839 stream_id = gst_stream_get_stream_id (slot->pending_stream);
2840 if (!g_strcmp0 (sid, stream_id))
2848 /* This function handles the reassignment of a slot. Call this from
2849 * the streaming thread of a slot. */
2851 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2853 DecodebinOutputStream *output;
2854 MultiQueueSlot *target_slot = NULL;
2856 const gchar *sid, *tsid;
2858 SELECTION_LOCK (dbin);
2859 output = slot->output;
2861 if (G_UNLIKELY (slot->active_stream == NULL)) {
2862 GST_DEBUG_OBJECT (slot->src_pad,
2863 "Called on inactive slot (active_stream == NULL)");
2864 SELECTION_UNLOCK (dbin);
2868 if (G_UNLIKELY (output == NULL)) {
2869 GST_DEBUG_OBJECT (slot->src_pad,
2870 "Slot doesn't have any output to be removed");
2871 SELECTION_UNLOCK (dbin);
2875 sid = gst_stream_get_stream_id (slot->active_stream);
2876 GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2878 /* Recheck whether this stream is still in the list of streams to deactivate */
2879 if (stream_in_list (dbin->requested_selection, sid)) {
2880 /* Stream is in the list of requested streams, don't remove */
2881 SELECTION_UNLOCK (dbin);
2882 GST_DEBUG_OBJECT (slot->src_pad,
2883 "Stream '%s' doesn't need to be deactivated", sid);
2887 /* Unlink slot from output */
2888 /* FIXME : Handle flushing ? */
2889 /* FIXME : Handle outputs without decoders */
2890 GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2891 output->decoder_sink);
2892 if (output->decoder_sink)
2893 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2894 output->linked = FALSE;
2895 slot->output = NULL;
2896 output->slot = NULL;
2897 /* Remove sid from active selection */
2898 GST_DEBUG ("Removing '%s' from active_selection", sid);
2899 for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2900 if (!g_strcmp0 (sid, tmp->data)) {
2902 dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2906 /* Can we re-assign this output to a requested stream ? */
2907 GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2908 for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2909 MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2910 GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2911 tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2912 if (tslot && tslot->type == output->type && tslot->output == NULL) {
2913 GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2914 target_slot = tslot;
2916 /* Pass target stream id to requested selection */
2917 dbin->requested_selection =
2918 g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2919 dbin->to_activate = g_list_delete_link (dbin->to_activate, tmp);
2925 GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2927 target_slot->output = output;
2928 output->slot = target_slot;
2929 GST_DEBUG ("Adding '%s' to active_selection", tsid);
2930 dbin->active_selection =
2931 g_list_append (dbin->active_selection, (gchar *) g_strdup (tsid));
2932 SELECTION_UNLOCK (dbin);
2934 /* Wakeup the target slot so that it retries to send events/buffers
2935 * thereby triggering the output reconfiguration codepath */
2936 gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2937 (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2938 /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2942 dbin->output_streams = g_list_remove (dbin->output_streams, output);
2943 free_output_stream (dbin, output);
2944 msg = is_selection_done (slot->dbin);
2945 SELECTION_UNLOCK (dbin);
2948 gst_element_post_message ((GstElement *) slot->dbin, msg);
2954 /* Idle probe called when a slot should be unassigned from its output stream.
2955 * This is needed to ensure nothing is flowing when unlinking the slot.
2957 * Also, this method will search for a pending stream which could re-use
2958 * the output stream. */
2959 static GstPadProbeReturn
2960 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2961 MultiQueueSlot * slot)
2963 GstDecodebin3 *dbin = slot->dbin;
2965 reassign_slot (dbin, slot);
2967 return GST_PAD_PROBE_REMOVE;
2971 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2974 gboolean ret = TRUE;
2976 /* List of slots to (de)activate. */
2977 GList *to_deactivate = NULL;
2978 GList *to_activate = NULL;
2979 /* List of unknown stream id, most likely means the event
2980 * should be sent upstream so that elements can expose the requested stream */
2981 GList *unknown = NULL;
2982 GList *to_reassign = NULL;
2983 GList *future_request_streams = NULL;
2984 GList *pending_streams = NULL;
2985 GList *slots_to_reassign = NULL;
2987 SELECTION_LOCK (dbin);
2988 if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2989 GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2990 SELECTION_UNLOCK (dbin);
2993 /* Remove pending select_streams */
2994 g_list_free (dbin->pending_select_streams);
2995 dbin->pending_select_streams = NULL;
2997 /* COMPARE the requested streams to the active and requested streams
3000 /* First check the slots to activate and which ones are unknown */
3001 for (tmp = select_streams; tmp; tmp = tmp->next) {
3002 const gchar *sid = (const gchar *) tmp->data;
3003 MultiQueueSlot *slot;
3004 GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
3005 slot = find_slot_for_stream_id (dbin, sid);
3006 /* Find the corresponding slot */
3008 if (stream_in_collection (dbin, (gchar *) sid)) {
3009 pending_streams = g_list_append (pending_streams, (gchar *) sid);
3011 GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
3012 unknown = g_list_append (unknown, (gchar *) sid);
3014 } else if (slot->output == NULL) {
3015 GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
3017 to_activate = g_list_append (to_activate, slot);
3019 GST_DEBUG_OBJECT (dbin,
3020 "Stream '%s' from slot %p is already active on output %p", sid, slot,
3022 future_request_streams =
3023 g_list_append (future_request_streams, (gchar *) sid);
3027 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3028 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3029 /* For slots that have an output, check if it's part of the streams to
3032 gboolean slot_to_deactivate = TRUE;
3034 if (slot->active_stream) {
3035 if (stream_in_list (select_streams,
3036 gst_stream_get_stream_id (slot->active_stream)))
3037 slot_to_deactivate = FALSE;
3039 if (slot_to_deactivate && slot->pending_stream
3040 && slot->pending_stream != slot->active_stream) {
3041 if (stream_in_list (select_streams,
3042 gst_stream_get_stream_id (slot->pending_stream)))
3043 slot_to_deactivate = FALSE;
3045 if (slot_to_deactivate) {
3046 GST_DEBUG_OBJECT (dbin,
3047 "Slot %p (%s) should be deactivated, no longer used", slot,
3049 active_stream ? gst_stream_get_stream_id (slot->active_stream) :
3051 to_deactivate = g_list_append (to_deactivate, slot);
3056 if (to_deactivate != NULL) {
3057 GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
3058 /* We need to compare what needs to be activated and deactivated in order
3059 * to determine whether there are outputs that can be transferred */
3060 /* Take the stream-id of the slots that are to be activated, for which there
3061 * is a slot of the same type that needs to be deactivated */
3062 tmp = to_deactivate;
3064 MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
3065 gboolean removeit = FALSE;
3067 GST_DEBUG_OBJECT (dbin,
3068 "Checking if slot to deactivate (%p) has a candidate slot to activate",
3069 slot_to_deactivate);
3070 for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
3071 MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
3072 GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
3073 if (slot_to_activate->type == slot_to_deactivate->type) {
3074 GST_DEBUG_OBJECT (dbin, "Re-using");
3075 to_reassign = g_list_append (to_reassign, (gchar *)
3076 gst_stream_get_stream_id (slot_to_activate->active_stream));
3078 g_list_append (slots_to_reassign, slot_to_deactivate);
3079 to_activate = g_list_remove (to_activate, slot_to_activate);
3086 to_deactivate = g_list_delete_link (to_deactivate, tmp);
3091 for (tmp = to_deactivate; tmp; tmp = tmp->next) {
3092 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3093 GST_DEBUG_OBJECT (dbin,
3094 "Really need to deactivate slot %p, but no available alternative",
3097 slots_to_reassign = g_list_append (slots_to_reassign, slot);
3100 /* The only slots left to activate are the ones that won't be reassigned and
3101 * therefore really need to have a new output created */
3102 for (tmp = to_activate; tmp; tmp = tmp->next) {
3103 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3104 if (slot->active_stream)
3105 future_request_streams =
3106 g_list_append (future_request_streams,
3107 (gchar *) gst_stream_get_stream_id (slot->active_stream));
3108 else if (slot->pending_stream)
3109 future_request_streams =
3110 g_list_append (future_request_streams,
3111 (gchar *) gst_stream_get_stream_id (slot->pending_stream));
3113 GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
3116 if (to_activate == NULL && pending_streams != NULL) {
3117 GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
3118 if (dbin->requested_selection)
3119 g_list_free_full (dbin->requested_selection, g_free);
3120 dbin->requested_selection =
3121 g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
3122 g_list_free (to_deactivate);
3123 g_list_free (pending_streams);
3124 to_deactivate = NULL;
3125 pending_streams = NULL;
3127 if (dbin->requested_selection)
3128 g_list_free_full (dbin->requested_selection, g_free);
3129 dbin->requested_selection =
3130 g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
3131 dbin->requested_selection =
3132 g_list_concat (dbin->requested_selection,
3133 g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
3134 if (dbin->to_activate)
3135 g_list_free (dbin->to_activate);
3136 dbin->to_activate = g_list_copy (to_reassign);
3139 dbin->selection_updated = TRUE;
3140 SELECTION_UNLOCK (dbin);
3143 GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
3144 g_list_free (unknown);
3147 if (to_activate && !slots_to_reassign) {
3148 for (tmp = to_activate; tmp; tmp = tmp->next) {
3149 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3150 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3151 (GstPadProbeCallback) idle_reconfigure, slot, NULL);
3155 /* For all streams to deactivate, add an idle probe where we will do
3156 * the unassignment and switch over */
3157 for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
3158 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3159 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3160 (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
3164 g_list_free (to_deactivate);
3166 g_list_free (to_activate);
3168 g_list_free (to_reassign);
3169 if (future_request_streams)
3170 g_list_free (future_request_streams);
3171 if (pending_streams)
3172 g_list_free (pending_streams);
3173 if (slots_to_reassign)
3174 g_list_free (slots_to_reassign);
3179 static GstPadProbeReturn
3180 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
3181 DecodebinOutputStream * output)
3183 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
3184 GstDecodebin3 *dbin = output->dbin;
3185 GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
3187 GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
3189 switch (GST_EVENT_TYPE (event)) {
3190 case GST_EVENT_SELECT_STREAMS:
3193 GList *streams = NULL;
3194 guint32 seqnum = gst_event_get_seqnum (event);
3196 if (dbin->upstream_selected) {
3197 GST_DEBUG_OBJECT (pad, "Letting select-streams event flow upstream");
3201 SELECTION_LOCK (dbin);
3202 if (seqnum == dbin->select_streams_seqnum) {
3203 SELECTION_UNLOCK (dbin);
3204 GST_DEBUG_OBJECT (pad,
3205 "Already handled/handling that SELECT_STREAMS event");
3206 gst_event_unref (event);
3207 ret = GST_PAD_PROBE_HANDLED;
3210 dbin->select_streams_seqnum = seqnum;
3211 if (dbin->pending_select_streams != NULL) {
3212 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3213 g_list_free (dbin->pending_select_streams);
3214 dbin->pending_select_streams = NULL;
3216 gst_event_parse_select_streams (event, &streams);
3217 dbin->pending_select_streams = g_list_copy (streams);
3218 SELECTION_UNLOCK (dbin);
3220 /* Send event upstream */
3221 if ((peer = gst_pad_get_peer (pad))) {
3222 gst_pad_send_event (peer, event);
3223 gst_object_unref (peer);
3225 gst_event_unref (event);
3227 /* Finally handle the switch */
3229 handle_stream_switch (dbin, streams, seqnum);
3230 g_list_free_full (streams, g_free);
3232 ret = GST_PAD_PROBE_HANDLED;
3243 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
3245 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3247 GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
3248 if (!dbin->upstream_selected
3249 && GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
3250 GList *streams = NULL;
3251 guint32 seqnum = gst_event_get_seqnum (event);
3253 SELECTION_LOCK (dbin);
3254 if (seqnum == dbin->select_streams_seqnum) {
3255 SELECTION_UNLOCK (dbin);
3256 GST_DEBUG_OBJECT (dbin,
3257 "Already handled/handling that SELECT_STREAMS event");
3260 dbin->select_streams_seqnum = seqnum;
3261 if (dbin->pending_select_streams != NULL) {
3262 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3263 g_list_free (dbin->pending_select_streams);
3264 dbin->pending_select_streams = NULL;
3266 gst_event_parse_select_streams (event, &streams);
3267 dbin->pending_select_streams = g_list_copy (streams);
3268 SELECTION_UNLOCK (dbin);
3270 /* FIXME : We don't have an upstream ?? */
3272 /* Send event upstream */
3273 if ((peer = gst_pad_get_peer (pad))) {
3274 gst_pad_send_event (peer, event);
3275 gst_object_unref (peer);
3278 /* Finally handle the switch */
3280 handle_stream_switch (dbin, streams, seqnum);
3281 g_list_free_full (streams, g_free);
3284 gst_event_unref (event);
3287 return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
3292 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3295 gst_pad_remove_probe (slot->src_pad, slot->probe_id);
3297 if (slot->input->srcpad)
3298 gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
3301 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
3302 gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
3303 gst_object_replace ((GstObject **) & slot->src_pad, NULL);
3304 gst_object_replace ((GstObject **) & slot->active_stream, NULL);
3309 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3311 GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
3312 gst_element_call_async (GST_ELEMENT_CAST (dbin),
3313 (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
3316 /* Create a DecodebinOutputStream for a given type
3317 * Note: It will be empty initially, it needs to be configured
3319 static DecodebinOutputStream *
3320 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
3322 DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
3324 const gchar *prefix;
3325 GstStaticPadTemplate *templ;
3326 GstPadTemplate *ptmpl;
3328 GstPad *internal_pad;
3330 GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
3331 res, gst_stream_type_get_name (type));
3335 res->decoder_latency = GST_CLOCK_TIME_NONE;
3337 if (type & GST_STREAM_TYPE_VIDEO) {
3338 templ = &video_src_template;
3339 counter = &dbin->vpadcount;
3341 } else if (type & GST_STREAM_TYPE_AUDIO) {
3342 templ = &audio_src_template;
3343 counter = &dbin->apadcount;
3345 } else if (type & GST_STREAM_TYPE_TEXT) {
3346 templ = &text_src_template;
3347 counter = &dbin->tpadcount;
3350 templ = &src_template;
3351 counter = &dbin->opadcount;
3355 pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
3357 ptmpl = gst_static_pad_template_get (templ);
3358 res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
3359 gst_object_unref (ptmpl);
3361 gst_pad_set_active (res->src_pad, TRUE);
3362 /* Put an event probe on the internal proxy pad to detect upstream
3365 (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
3366 gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
3367 (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
3368 gst_object_unref (internal_pad);
3370 dbin->output_streams = g_list_append (dbin->output_streams, res);
3376 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
3379 if (output->decoder_sink && output->decoder)
3380 gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
3382 output->slot->output = NULL;
3383 output->slot = NULL;
3385 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
3386 gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
3387 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
3388 if (output->src_exposed) {
3389 gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
3391 if (output->decoder) {
3392 gst_element_set_locked_state (output->decoder, TRUE);
3393 gst_element_set_state (output->decoder, GST_STATE_NULL);
3394 gst_bin_remove ((GstBin *) dbin, output->decoder);
3399 static GstStateChangeReturn
3400 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
3402 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3403 GstStateChangeReturn ret;
3406 switch (transition) {
3410 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3411 if (ret == GST_STATE_CHANGE_FAILURE)
3414 switch (transition) {
3415 case GST_STATE_CHANGE_PAUSED_TO_READY:
3419 /* Free output streams */
3420 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
3421 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
3422 free_output_stream (dbin, output);
3424 g_list_free (dbin->output_streams);
3425 dbin->output_streams = NULL;
3426 /* Free multiqueue slots */
3427 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3428 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3429 free_multiqueue_slot (dbin, slot);
3431 g_list_free (dbin->slots);
3433 dbin->current_group_id = GST_GROUP_ID_INVALID;
3435 /* Reset the main input group id since it will get a new id on a new stream */
3436 dbin->main_input->group_id = GST_GROUP_ID_INVALID;
3437 /* Reset multiqueue to default interleave */
3438 g_object_set (dbin->multiqueue, "min-interleave-time",
3439 dbin->default_mq_min_interleave, NULL);
3440 dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
3441 dbin->upstream_selected = FALSE;
3442 g_list_free_full (dbin->active_selection, g_free);
3443 dbin->active_selection = NULL;