3 * Copyright (C) <2015> Centricular Ltd
4 * @author: Edward Hervey <edward@centricular.com>
5 * @author: Jan Schmidt <jan@centricular.com>
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
31 #include <gst/pbutils/pbutils.h>
33 #include "gstplayback.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
38 * SECTION:element-decodebin3
40 * #GstBin that auto-magically constructs a decoding pipeline using available
41 * decoders and demuxers via auto-plugging. The output is raw audio, video
42 * or subtitle streams.
44 * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
48 * supports publication and selection of stream information via
49 * GstStreamCollection messages and #GST_EVENT_SELECT_STREAM events.
52 * dynamically switches stream connections internally, and
53 * reuses decoder elements when stream selections change, so that in
54 * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
55 * and only creates new elements when streams change and an existing decoder
56 * is not capable of handling the new format.
59 * supports multiple input pads for the parallel decoding of auxilliary streams
60 * not muxed with the primary stream.
63 * does not handle network stream buffering. decodebin3 expects that network stream
64 * buffering is handled upstream, before data is passed to it.
68 * <emphasis>decodebin3 is still experimental API and a technology preview.
69 * Its behaviour and exposed API is subject to change.</emphasis>
76 * 1) From sink pad to elementary streams (GstParseBin)
78 * The input sink pads are fed to GstParseBin. GstParseBin will feed them
79 * through typefind. When the caps are detected (or changed) we recursively
80 * figure out which demuxer, parser or depayloader is needed until we get to
83 * All elementary streams (whether decoded or not, whether exposed or not) are
84 * fed through multiqueue. There is only *one* multiqueue in decodebin3.
86 * => MultiQueue is the cornerstone.
87 * => No buffering before multiqueue
89 * 2) Elementary streams
91 * After GstParseBin, there are 3 main components:
92 * 1) Input Streams (provided by GstParseBin)
96 * Input Streams correspond to the stream coming from GstParseBin and that gets
97 * fed into a multiqueue slot.
99 * Output Streams correspond to the combination of a (optional) decoder and an
100 * output ghostpad. Output Streams can be moved from one multiqueue slot to
101 * another, can reconfigure itself (different decoders), and can be
102 * added/removed depending on the configuration (all streams outputted, only one
103 * of each type, ...).
105 * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
106 * each 'active' Input Stream there is a corresponding slot.
107 * Slots might have different streams on input and output (due to internal
110 * Due to internal queuing/buffering/..., all those components (might) behave
111 * asynchronously. Therefore probes will be used on each component source pad to
112 * detect various key-points:
114 * the stream is done => Mark that component as done, optionally freeing/removing it
116 * a new stream is starting => link it further if needed
119 * 3) Gradual replacement
121 * If the caps change at any point in decodebin (input sink pad, demuxer output,
122 * multiqueue output, ..), we gradually replace (if needed) the following elements.
124 * This is handled by the probes in various locations:
126 * b) multiqueue input (source pad of Input Streams)
127 * c) multiqueue output (source pad of Multiqueue Slots)
128 * d) final output (target of source ghostpads)
130 * When CAPS event arrive at those points, one of three things can happen:
131 * a) There is no elements downstream yet, just create/link-to following elements
132 * b) There are downstream elements, do a ACCEPT_CAPS query
133 * b.1) The new CAPS are accepted, keep current configuration
134 * b.2) The new CAPS are not accepted, remove following elements then do a)
141 * Input(s) Slots Streams
142 * /-------------------------------------------\ /-----\ /------------- \
144 * +-------------------------------------------------------------------------+
146 * | +---------------------------------------------+ |
147 * | | GstParseBin(s) | |
148 * | | +--------------+ | +-----+ |
149 * | | | |---[parser]-[|--| Mul |---[ decoder ]-[|
150 * |]--[ typefind ]---| demuxer(s) |------------[| | ti | |
151 * | | | (if needed) |---[parser]-[|--| qu | |
152 * | | | |---[parser]-[|--| eu |---[ decoder ]-[|
153 * | | +--------------+ | +------ ^ |
154 * | +---------------------------------------------+ ^ | |
156 * +-----------------------------------------------+--------+-------------+--+
159 * Probes --/--------/-------------/
163 * We want to ensure we re-use decoders when switching streams. This takes place
164 * at the multiqueue output level.
167 * 1) Activating a stream (i.e. linking a slot to an output) is only done within
168 * the streaming thread in the multiqueue_src_probe() and only if the
169 stream is in the REQUESTED selection.
170 * 2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
171 * within the stream thread, but only in a purposefully called IDLE probe
172 * that calls reassign_slot().
174 * Based on those two principles, 3 "selection" of streams (stream-id) are used:
175 * 1) requested_selection
176 * All streams within that list should be activated
177 * 2) active_selection
178 * List of streams that are exposed by decodebin
180 * List of streams that will be moved to requested_selection in the
181 * reassign_slot() method (i.e. once a stream was deactivated, and the output
186 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
187 #define GST_CAT_DEFAULT decodebin3_debug
189 #define GST_TYPE_DECODEBIN3 (gst_decodebin3_get_type ())
191 #define EXTRA_DEBUG 1
193 typedef struct _GstDecodebin3 GstDecodebin3;
194 typedef struct _GstDecodebin3Class GstDecodebin3Class;
196 typedef struct _DecodebinInputStream DecodebinInputStream;
197 typedef struct _DecodebinInput DecodebinInput;
198 typedef struct _DecodebinOutputStream DecodebinOutputStream;
200 struct _GstDecodebin3
204 /* input_lock protects the following variables */
206 /* Main input (static sink pad) */
207 DecodebinInput *main_input;
208 /* Supplementary input (request sink pads) */
210 /* counter for input */
211 guint32 input_counter;
212 /* Current stream group_id (default : G_MAXUINT32) */
213 /* FIXME : Needs to be resetted appropriately (when upstream changes ?) */
214 guint32 current_group_id;
215 /* End of variables protected by input_lock */
217 GstElement *multiqueue;
219 /* FIXME : Mutex for protecting values below */
220 GstStreamCollection *collection; /* Active collection */
222 GList *input_streams; /* List of DecodebinInputStream for active collection */
223 GList *output_streams; /* List of DecodebinOutputStream used for output */
224 GList *slots; /* List of MultiQueueSlot */
227 /* selection_lock protects access to following variables */
228 GMutex selection_lock;
229 /* requested selection of stream-id to activate post-multiqueue */
230 GList *requested_selection;
231 /* list of stream-id currently activated in output */
232 GList *active_selection;
233 /* List of stream-id that need to be activated (after a stream switch for ex) */
235 /* Pending select streams event */
236 guint32 select_streams_seqnum;
237 /* pending list of streams to select (from downstream) */
238 GList *pending_select_streams;
239 /* TRUE if requested_selection was updated, will become FALSE once
240 * it has fully transitioned to active */
241 gboolean selection_updated;
242 /* End of variables protected by selection_lock */
244 /* List of pending collections.
245 * FIXME : Is this really needed ? */
246 GList *pending_collection;
250 GMutex factories_lock;
251 guint32 factories_cookie;
252 /* All DECODABLE factories */
254 /* Only DECODER factories */
255 GList *decoder_factories;
256 /* DECODABLE but not DECODER factories */
257 GList *decodable_factories;
259 /* counters for pads */
260 guint32 apadcount, vpadcount, tpadcount, opadcount;
266 struct _GstDecodebin3Class
270 gint (*select_stream) (GstDecodebin3 * dbin,
271 GstStreamCollection * collection, GstStream * stream);
274 /* Input of decodebin, controls input pad and parsebin */
275 struct _DecodebinInput
282 GstPad *parsebin_sink;
284 GstStreamCollection *collection; /* Active collection */
288 GstElement *parsebin;
290 gulong pad_added_sigid;
291 gulong pad_removed_sigid;
293 /* HACK : Remove these fields */
294 /* List of PendingPad structures */
298 /* Multiqueue Slots */
299 typedef struct _MultiQueueSlot
304 /* Type of stream handled by this slot */
307 /* Linked input and output */
308 DecodebinInputStream *input;
310 /* pending => last stream received on sink pad */
311 GstStream *pending_stream;
312 /* active => last stream outputted on source pad */
313 GstStream *active_stream;
315 GstPad *sink_pad, *src_pad;
317 /* id of the MQ src_pad event probe */
324 DecodebinOutputStream *output;
327 /* Streams that are exposed downstream (i.e. output) */
328 struct _DecodebinOutputStream
331 /* The type of stream handled by this output stream */
334 /* The slot to which this output stream is currently connected to */
335 MultiQueueSlot *slot;
337 GstElement *decoder; /* Optional */
338 GstPad *decoder_sink, *decoder_src;
343 /* Flag if ghost pad is exposed */
344 gboolean src_exposed;
346 /* keyframe dropping probe */
347 gulong drop_probe_id;
350 /* Pending pads from parsebin */
351 typedef struct _PendingPad
354 DecodebinInput *input;
363 #define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
374 SIGNAL_SELECT_STREAM,
377 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
379 #define SELECTION_LOCK(dbin) G_STMT_START { \
380 GST_LOG_OBJECT (dbin, \
381 "selection locking from thread %p", \
383 g_mutex_lock (&dbin->selection_lock); \
384 GST_LOG_OBJECT (dbin, \
385 "selection locked from thread %p", \
389 #define SELECTION_UNLOCK(dbin) G_STMT_START { \
390 GST_LOG_OBJECT (dbin, \
391 "selection unlocking from thread %p", \
393 g_mutex_unlock (&dbin->selection_lock); \
396 #define INPUT_LOCK(dbin) G_STMT_START { \
397 GST_LOG_OBJECT (dbin, \
398 "input locking from thread %p", \
400 g_mutex_lock (&dbin->input_lock); \
401 GST_LOG_OBJECT (dbin, \
402 "input locked from thread %p", \
406 #define INPUT_UNLOCK(dbin) G_STMT_START { \
407 GST_LOG_OBJECT (dbin, \
408 "input unlocking from thread %p", \
410 g_mutex_unlock (&dbin->input_lock); \
413 GType gst_decodebin3_get_type (void);
414 #define gst_decodebin3_parent_class parent_class
415 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
417 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
419 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
422 GST_STATIC_CAPS_ANY);
424 static GstStaticPadTemplate request_sink_template =
425 GST_STATIC_PAD_TEMPLATE ("sink_%u",
428 GST_STATIC_CAPS_ANY);
430 static GstStaticPadTemplate video_src_template =
431 GST_STATIC_PAD_TEMPLATE ("video_%u",
434 GST_STATIC_CAPS_ANY);
436 static GstStaticPadTemplate audio_src_template =
437 GST_STATIC_PAD_TEMPLATE ("audio_%u",
440 GST_STATIC_CAPS_ANY);
442 static GstStaticPadTemplate text_src_template =
443 GST_STATIC_PAD_TEMPLATE ("text_%u",
446 GST_STATIC_CAPS_ANY);
448 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
451 GST_STATIC_CAPS_ANY);
454 static void gst_decodebin3_dispose (GObject * object);
455 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
456 const GValue * value, GParamSpec * pspec);
457 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
458 GValue * value, GParamSpec * pspec);
460 static gboolean parsebin_autoplug_continue_cb (GstElement *
461 parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
464 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
465 GstStreamCollection * collection, GstStream * stream)
467 GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
472 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
473 GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
474 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
475 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
476 GstStateChange transition);
477 static gboolean gst_decodebin3_send_event (GstElement * element,
480 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
482 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
483 GstElementFactoryListType ftype);
486 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
487 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
488 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
489 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
491 static void reconfigure_output_stream (DecodebinOutputStream * output,
492 MultiQueueSlot * slot);
493 static void free_output_stream (GstDecodebin3 * dbin,
494 DecodebinOutputStream * output);
495 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
498 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
499 GstPadProbeInfo * info, MultiQueueSlot * slot);
500 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
501 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
502 DecodebinInputStream * input);
503 static void link_input_to_slot (DecodebinInputStream * input,
504 MultiQueueSlot * slot);
505 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
506 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
507 MultiQueueSlot * slot);
509 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
510 static void update_requested_selection (GstDecodebin3 * dbin,
511 GstStreamCollection * collection);
513 /* FIXME: Really make all the parser stuff a self-contained helper object */
514 #include "gstdecodebin3-parse.c"
517 _gst_int_accumulator (GSignalInvocationHint * ihint,
518 GValue * return_accu, const GValue * handler_return, gpointer dummy)
520 gint res = g_value_get_int (handler_return);
522 if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
523 g_value_set_int (return_accu, res);
532 gst_decodebin3_class_init (GstDecodebin3Class * klass)
534 GObjectClass *gobject_klass = (GObjectClass *) klass;
535 GstElementClass *element_class = (GstElementClass *) klass;
536 GstBinClass *bin_klass = (GstBinClass *) klass;
538 gobject_klass->dispose = gst_decodebin3_dispose;
539 gobject_klass->set_property = gst_decodebin3_set_property;
540 gobject_klass->get_property = gst_decodebin3_get_property;
542 /* FIXME : ADD PROPERTIES ! */
543 g_object_class_install_property (gobject_klass, PROP_CAPS,
544 g_param_spec_boxed ("caps", "Caps",
545 "The caps on which to stop decoding. (NULL = default)",
546 GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
548 /* FIXME : ADD SIGNALS ! */
550 * GstDecodebin3::select-stream
551 * @decodebin: a #GstDecodebin3
552 * @collection: a #GstStreamCollection
553 * @stream: a #GstStream
555 * This signal is emitted whenever @decodebin needs to decide whether
556 * to expose a @stream of a given @collection.
558 * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
559 * A value of -1 (default) lets @decodebin decide what to do with the stream.
561 gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
562 g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
563 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
564 _gst_int_accumulator, NULL, g_cclosure_marshal_generic,
565 G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
568 element_class->request_new_pad =
569 GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
570 element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
571 element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
573 gst_element_class_add_pad_template (element_class,
574 gst_static_pad_template_get (&sink_template));
575 gst_element_class_add_pad_template (element_class,
576 gst_static_pad_template_get (&request_sink_template));
577 gst_element_class_add_pad_template (element_class,
578 gst_static_pad_template_get (&video_src_template));
579 gst_element_class_add_pad_template (element_class,
580 gst_static_pad_template_get (&audio_src_template));
581 gst_element_class_add_pad_template (element_class,
582 gst_static_pad_template_get (&text_src_template));
583 gst_element_class_add_pad_template (element_class,
584 gst_static_pad_template_get (&src_template));
586 gst_element_class_set_static_metadata (element_class,
587 "Decoder Bin 3", "Generic/Bin/Decoder",
588 "Autoplug and decode to raw media",
589 "Edward Hervey <edward@centricular.com>");
591 bin_klass->handle_message = gst_decodebin3_handle_message;
593 klass->select_stream = gst_decodebin3_select_stream;
597 gst_decodebin3_init (GstDecodebin3 * dbin)
599 /* Create main input */
600 dbin->main_input = create_new_input (dbin, TRUE);
602 dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
603 g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
604 "max-size-buffers", 0, "use-interleave", TRUE, NULL);
605 gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
607 dbin->current_group_id = G_MAXUINT32;
609 g_mutex_init (&dbin->factories_lock);
610 g_mutex_init (&dbin->selection_lock);
611 g_mutex_init (&dbin->input_lock);
613 dbin->caps = gst_static_caps_get (&default_raw_caps);
615 GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
619 gst_decodebin3_dispose (GObject * object)
621 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
625 gst_plugin_feature_list_free (dbin->factories);
626 if (dbin->decoder_factories)
627 g_list_free (dbin->decoder_factories);
628 if (dbin->decodable_factories)
629 g_list_free (dbin->decodable_factories);
630 g_list_free (dbin->requested_selection);
631 g_list_free (dbin->active_selection);
632 g_list_free (dbin->to_activate);
633 g_list_free (dbin->pending_select_streams);
634 g_clear_object (&dbin->collection);
636 free_input (dbin, dbin->main_input);
638 for (walk = dbin->other_inputs; walk; walk = next) {
639 DecodebinInput *input = walk->data;
641 next = g_list_next (walk);
643 free_input (dbin, input);
644 dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
647 G_OBJECT_CLASS (parent_class)->dispose (object);
651 gst_decodebin3_set_property (GObject * object, guint prop_id,
652 const GValue * value, GParamSpec * pspec)
654 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
656 /* FIXME : IMPLEMENT */
659 GST_OBJECT_LOCK (dbin);
661 gst_caps_unref (dbin->caps);
662 dbin->caps = g_value_dup_boxed (value);
663 GST_OBJECT_UNLOCK (dbin);
666 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
672 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
675 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
677 /* FIXME : IMPLEMENT */
680 GST_OBJECT_LOCK (dbin);
681 g_value_set_boxed (value, dbin->caps);
682 GST_OBJECT_UNLOCK (dbin);
685 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
691 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
692 GstCaps * caps, GstDecodebin3 * dbin)
694 GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
696 /* If it matches our target caps, expose it */
697 if (gst_caps_can_intersect (caps, dbin->caps))
703 /* This method should be called whenever a STREAM_START event
704 * comes out of a given parsebin.
705 * The caller shall replace the group_id if the function returns TRUE */
707 set_input_group_id (DecodebinInput * input, guint32 * group_id)
709 GstDecodebin3 *dbin = input->dbin;
711 if (input->group_id != *group_id) {
712 if (input->group_id != G_MAXUINT32)
713 GST_WARNING_OBJECT (dbin,
714 "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
715 ") on input %p ", input->group_id, *group_id, input);
716 input->group_id = *group_id;
719 if (*group_id != dbin->current_group_id) {
720 if (dbin->current_group_id == G_MAXUINT32) {
721 GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
723 dbin->current_group_id = *group_id;
725 *group_id = dbin->current_group_id;
732 /* Call with INPUT_LOCK taken */
734 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
736 gboolean set_state = FALSE;
738 if (input->parsebin == NULL) {
739 input->parsebin = gst_element_factory_make ("parsebin", NULL);
740 if (input->parsebin == NULL)
742 input->parsebin = gst_object_ref (input->parsebin);
743 input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
744 input->pad_added_sigid =
745 g_signal_connect (input->parsebin, "pad-added",
746 (GCallback) parsebin_pad_added_cb, input);
747 input->pad_removed_sigid =
748 g_signal_connect (input->parsebin, "pad-removed",
749 (GCallback) parsebin_pad_removed_cb, input);
750 g_signal_connect (input->parsebin, "autoplug-continue",
751 (GCallback) parsebin_autoplug_continue_cb, dbin);
754 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
755 gst_bin_add (GST_BIN (dbin), input->parsebin);
759 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
760 input->parsebin_sink);
762 gst_element_sync_state_with_parent (input->parsebin);
769 gst_element_post_message ((GstElement *) dbin,
770 gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
775 static GstPadLinkReturn
776 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
778 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
779 GstPadLinkReturn res = GST_PAD_LINK_OK;
780 DecodebinInput *input;
782 GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
783 ". Creating parsebin if needed", pad);
785 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
789 if (!ensure_input_parsebin (dbin, input))
790 res = GST_PAD_LINK_REFUSED;
795 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
796 return GST_PAD_LINK_REFUSED;
800 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
802 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
803 DecodebinInput *input;
805 GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
806 ". Removing parsebin.", pad);
808 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
812 if (input->parsebin == NULL) {
817 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
818 GstStreamCollection *collection = NULL;
820 /* Clear stream-collection corresponding to current INPUT and post new
821 * stream-collection message, if needed */
822 if (input->collection) {
823 gst_object_unref (input->collection);
824 input->collection = NULL;
827 collection = get_merged_collection (dbin);
828 if (collection && collection != dbin->collection) {
830 GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
832 if (dbin->collection)
833 gst_object_unref (dbin->collection);
834 dbin->collection = collection;
837 gst_message_new_stream_collection ((GstObject *) dbin,
840 gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
841 update_requested_selection (dbin, dbin->collection);
844 gst_bin_remove (GST_BIN (dbin), input->parsebin);
845 gst_element_set_state (input->parsebin, GST_STATE_NULL);
846 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
847 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
848 gst_object_unref (input->parsebin);
849 gst_object_unref (input->parsebin_sink);
851 input->parsebin = NULL;
852 input->parsebin_sink = NULL;
854 if (!input->is_main) {
855 dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
856 free_input_async (dbin, input);
863 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
868 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
870 GST_DEBUG ("Freeing input %p", input);
871 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
872 gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
873 if (input->parsebin) {
874 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
875 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
876 gst_element_set_state (input->parsebin, GST_STATE_NULL);
877 gst_object_unref (input->parsebin);
878 gst_object_unref (input->parsebin_sink);
880 if (input->collection)
881 gst_object_unref (input->collection);
886 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
888 GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
889 gst_element_call_async (GST_ELEMENT_CAST (dbin),
890 (GstElementCallAsyncFunc) free_input, input, NULL);
893 /* Call with INPUT_LOCK taken */
894 static DecodebinInput *
895 create_new_input (GstDecodebin3 * dbin, gboolean main)
897 DecodebinInput *input;
899 input = g_new0 (DecodebinInput, 1);
901 input->is_main = main;
902 input->group_id = G_MAXUINT32;
904 input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
906 gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
907 input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
910 g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
911 gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
912 gst_pad_set_unlink_function (input->ghost_sink,
913 gst_decodebin3_input_pad_unlink);
915 gst_pad_set_active (input->ghost_sink, TRUE);
916 gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
923 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
924 const gchar * name, const GstCaps * caps)
926 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
927 DecodebinInput *input;
930 /* We are ignoring names for the time being, not sure it makes any sense
931 * within the context of decodebin3 ... */
933 input = create_new_input (dbin, FALSE);
935 dbin->other_inputs = g_list_append (dbin->other_inputs, input);
936 res = input->ghost_sink;
943 /* Must be called with factories lock! */
945 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
949 cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
950 if (!dbin->factories || dbin->factories_cookie != cookie) {
953 gst_plugin_feature_list_free (dbin->factories);
954 if (dbin->decoder_factories)
955 g_list_free (dbin->decoder_factories);
956 if (dbin->decodable_factories)
957 g_list_free (dbin->decodable_factories);
959 gst_element_factory_list_get_elements
960 (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
962 g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
963 dbin->factories_cookie = cookie;
965 /* Filter decoder and other decodables */
966 dbin->decoder_factories = NULL;
967 dbin->decodable_factories = NULL;
968 for (tmp = dbin->factories; tmp; tmp = tmp->next) {
969 GstElementFactory *fact = (GstElementFactory *) tmp->data;
970 if (gst_element_factory_list_is_type (fact,
971 GST_ELEMENT_FACTORY_TYPE_DECODER))
972 dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
974 dbin->decodable_factories =
975 g_list_append (dbin->decodable_factories, fact);
980 /* Must be called with appropriate lock if list is a protected variable */
982 stream_in_list (GList * list, const gchar * sid)
987 for (tmp = list; tmp; tmp = tmp->next) {
988 gchar *osid = (gchar *) tmp->data;
989 GST_DEBUG ("Checking %s against %s", sid, osid);
993 for (tmp = list; tmp; tmp = tmp->next) {
994 gchar *osid = (gchar *) tmp->data;
995 if (!g_strcmp0 (sid, osid))
1003 update_requested_selection (GstDecodebin3 * dbin,
1004 GstStreamCollection * collection)
1008 GstStreamType used_types = 0;
1010 nb = gst_stream_collection_get_size (collection);
1012 /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1013 * the switch handler will take care of the pending selection */
1014 SELECTION_LOCK (dbin);
1015 if (dbin->pending_select_streams) {
1016 GST_DEBUG_OBJECT (dbin,
1017 "No need to create pending selection, SELECT_STREAMS underway");
1021 /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1022 GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1024 /* 3. If not, check if we already have some of the streams in the
1025 * existing active/requested selection */
1026 for (i = 0; i < nb; i++) {
1027 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1028 const gchar *sid = gst_stream_get_stream_id (stream);
1030 /* Fire select-stream signal to see if outside components want to
1031 * hint at which streams should be selected */
1032 g_signal_emit (G_OBJECT (dbin),
1033 gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1035 GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1036 if (request == 1 || (request == -1
1037 && (stream_in_list (dbin->requested_selection, sid)
1038 || stream_in_list (dbin->active_selection, sid)))) {
1039 GstStreamType curtype = gst_stream_get_stream_type (stream);
1041 GST_DEBUG_OBJECT (dbin,
1042 "Using stream requested by 'select-stream' signal : %s", sid);
1044 GST_DEBUG_OBJECT (dbin,
1045 "Re-using stream already present in requested or active selection : %s",
1047 tmp = g_list_append (tmp, (gchar *) sid);
1048 used_types |= curtype;
1052 /* 4. If not, match one stream of each type */
1053 for (i = 0; i < nb; i++) {
1054 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1055 GstStreamType curtype = gst_stream_get_stream_type (stream);
1056 if (!(used_types & curtype)) {
1057 const gchar *sid = gst_stream_get_stream_id (stream);
1058 GST_DEBUG_OBJECT (dbin, "Selecting stream '%s' of type %s",
1059 sid, gst_stream_type_get_name (curtype));
1060 tmp = g_list_append (tmp, (gchar *) sid);
1061 used_types |= curtype;
1066 /* Finally set the requested selection */
1068 if (dbin->requested_selection) {
1069 GST_FIXME_OBJECT (dbin,
1070 "Replacing non-NULL requested_selection, what should we do ??");
1071 g_list_free (dbin->requested_selection);
1073 dbin->requested_selection = tmp;
1074 dbin->selection_updated = TRUE;
1076 SELECTION_UNLOCK (dbin);
1079 /* Call with INPUT_LOCK taken */
1080 static GstStreamCollection *
1081 get_merged_collection (GstDecodebin3 * dbin)
1083 gboolean needs_merge = FALSE;
1084 GstStreamCollection *res = NULL;
1088 /* First check if we need to do a merge or just return the only collection */
1089 res = dbin->main_input->collection;
1091 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1092 DecodebinInput *input = (DecodebinInput *) tmp->data;
1093 if (input->collection) {
1098 res = input->collection;
1103 GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1104 return res ? gst_object_ref (res) : NULL;
1107 /* We really need to create a new collection */
1108 /* FIXME : Some numbering scheme maybe ?? */
1109 res = gst_stream_collection_new ("decodebin3");
1110 if (dbin->main_input->collection) {
1111 nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1112 GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1113 for (i = 0; i < nb_stream; i++) {
1115 gst_stream_collection_get_stream (dbin->main_input->collection, i);
1116 gst_stream_collection_add_stream (res, gst_object_ref (stream));
1120 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1121 DecodebinInput *input = (DecodebinInput *) tmp->data;
1122 GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1124 if (input->collection) {
1125 nb_stream = gst_stream_collection_get_size (input->collection);
1126 GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1127 for (i = 0; i < nb_stream; i++) {
1129 gst_stream_collection_get_stream (input->collection, i);
1130 gst_stream_collection_add_stream (res, gst_object_ref (stream));
1138 /* Call with INPUT_LOCK taken */
1139 static DecodebinInput *
1140 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1142 DecodebinInput *input = NULL;
1143 GstElement *parent = gst_object_ref (child);
1147 GstElement *next_parent;
1149 GST_DEBUG_OBJECT (dbin, "parent %s",
1150 parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1152 if (parent == dbin->main_input->parsebin) {
1153 input = dbin->main_input;
1156 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1157 DecodebinInput *cur = (DecodebinInput *) tmp->data;
1158 if (parent == cur->parsebin) {
1163 next_parent = (GstElement *) gst_element_get_parent (parent);
1164 gst_object_unref (parent);
1165 parent = next_parent;
1167 } while (parent && parent != (GstElement *) dbin);
1170 gst_object_unref (parent);
1176 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1180 if (dbin->collection == NULL)
1182 len = gst_stream_collection_get_size (dbin->collection);
1183 for (i = 0; i < len; i++) {
1184 GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1185 const gchar *osid = gst_stream_get_stream_id (stream);
1186 if (!g_strcmp0 (sid, osid))
1193 /* Call with INPUT_LOCK taken */
1195 handle_stream_collection (GstDecodebin3 * dbin,
1196 GstStreamCollection * collection, GstElement * child)
1198 #ifndef GST_DISABLE_GST_DEBUG
1199 const gchar *upstream_id;
1202 DecodebinInput *input = find_message_parsebin (dbin, child);
1205 GST_DEBUG_OBJECT (dbin,
1206 "Couldn't find corresponding input, most likely shutting down");
1210 /* Replace collection in input */
1211 if (input->collection)
1212 gst_object_unref (input->collection);
1213 input->collection = gst_object_ref (collection);
1214 GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1217 /* Merge collection if needed */
1218 collection = get_merged_collection (dbin);
1220 #ifndef GST_DISABLE_GST_DEBUG
1221 /* Just some debugging */
1222 upstream_id = gst_stream_collection_get_upstream_id (collection);
1223 GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1224 GST_DEBUG ("From input %p", input);
1225 GST_DEBUG (" %d streams", gst_stream_collection_get_size (collection));
1226 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1227 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1228 GstTagList *taglist;
1231 GST_DEBUG (" Stream '%s'", gst_stream_get_stream_id (stream));
1232 GST_DEBUG (" type : %s",
1233 gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1234 GST_DEBUG (" flags : 0x%x", gst_stream_get_stream_flags (stream));
1235 taglist = gst_stream_get_tags (stream);
1236 GST_DEBUG (" tags : %" GST_PTR_FORMAT, taglist);
1237 caps = gst_stream_get_caps (stream);
1238 GST_DEBUG (" caps : %" GST_PTR_FORMAT, caps);
1240 gst_tag_list_unref (taglist);
1242 gst_caps_unref (caps);
1246 /* Store collection for later usage */
1247 if (dbin->collection == NULL) {
1248 dbin->collection = collection;
1250 /* We need to check who emitted this collection (the owner).
1251 * If we already had a collection from that user, this one is an update,
1252 * that is to say that we need to figure out how we are going to re-use
1253 * the streams/slot */
1254 GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1255 /* FIXME : When do we switch from pending collection to active collection ?
1256 * When all streams from active collection are drained in multiqueue output ? */
1257 gst_object_unref (dbin->collection);
1258 dbin->collection = collection;
1259 /* dbin->pending_collection = */
1260 /* g_list_append (dbin->pending_collection, collection); */
1265 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1267 GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1268 gboolean posting_collection = FALSE;
1270 GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1272 switch (GST_MESSAGE_TYPE (message)) {
1273 case GST_MESSAGE_STREAM_COLLECTION:
1275 GstStreamCollection *collection = NULL;
1276 gst_message_parse_stream_collection (message, &collection);
1279 handle_stream_collection (dbin, collection,
1280 (GstElement *) GST_MESSAGE_SRC (message));
1281 posting_collection = TRUE;
1282 INPUT_UNLOCK (dbin);
1284 if (dbin->collection && collection != dbin->collection) {
1285 /* Replace collection message, we most likely aggregated it */
1286 GstMessage *new_msg;
1288 gst_message_new_stream_collection ((GstObject *) dbin,
1290 gst_message_unref (message);
1294 gst_object_unref (collection);
1301 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1303 if (posting_collection) {
1304 /* Figure out a selection for that collection */
1305 update_requested_selection (dbin, dbin->collection);
1309 static DecodebinOutputStream *
1310 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1313 GstStreamType stype = gst_stream_get_stream_type (stream);
1315 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1316 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1317 if (output->type == stype && output->slot && output->slot->active_stream) {
1318 GstStream *tstream = output->slot->active_stream;
1319 if (!stream_in_list (dbin->requested_selection,
1320 (gchar *) gst_stream_get_stream_id (tstream))) {
1329 /* Give a certain slot, figure out if it should be linked to an
1331 * CALL WITH SELECTION LOCK TAKEN !*/
1332 static DecodebinOutputStream *
1333 get_output_for_slot (MultiQueueSlot * slot)
1335 GstDecodebin3 *dbin = slot->dbin;
1336 DecodebinOutputStream *output = NULL;
1337 const gchar *stream_id;
1340 /* If we already have a configured output, just use it */
1341 if (slot->output != NULL)
1342 return slot->output;
1347 * This method needs to be split into multiple parts
1349 * 1) Figure out whether stream should be exposed or not
1350 * This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1351 * in the default stream attribution
1353 * 2) Figure out whether an output stream should be created, whether
1354 * we can re-use the output stream already linked to the slot, or
1355 * whether we need to get re-assigned another (currently used) output
1359 stream_id = gst_stream_get_stream_id (slot->active_stream);
1360 caps = gst_stream_get_caps (slot->active_stream);
1361 GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1362 gst_caps_unref (caps);
1364 /* 0. Emit autoplug-continue signal for pending caps ? */
1365 GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1367 /* 1. if in EXPOSE_ALL_MODE, just accept */
1368 GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1371 /* FIXME : The idea around this was to avoid activating a stream for
1372 * which we have no decoder. Unfortunately it is way too
1373 * expensive. Need to figure out a better solution */
1374 /* 2. Is there a potential decoder (if one is required) */
1375 if (!gst_caps_can_intersect (caps, dbin->caps)
1376 && !have_factory (dbin, (GstCaps *) caps,
1377 GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1378 GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1380 SELECTION_UNLOCK (dbin);
1381 gst_element_post_message (GST_ELEMENT_CAST (dbin),
1382 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1383 SELECTION_LOCK (dbin);
1388 /* 3. In default mode check if we should expose */
1389 if (stream_in_list (dbin->requested_selection, stream_id)) {
1390 /* Check if we can steal an existing output stream we could re-use.
1392 * * an output stream whose slot->stream is not in requested
1393 * * and is of the same type as this stream
1395 output = find_free_compatible_output (dbin, slot->active_stream);
1397 /* Move this output from its current slot to this slot */
1399 g_list_append (dbin->to_activate, (gchar *) stream_id);
1400 dbin->requested_selection =
1401 g_list_remove (dbin->requested_selection, stream_id);
1402 SELECTION_UNLOCK (dbin);
1403 gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1404 (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1405 SELECTION_LOCK (dbin);
1409 output = create_output_stream (dbin, slot->type);
1410 output->slot = slot;
1411 GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1412 slot->output = output;
1413 dbin->active_selection =
1414 g_list_append (dbin->active_selection, (gchar *) stream_id);
1416 GST_DEBUG ("Not creating any output for slot %p", slot);
1421 /* Returns SELECTED_STREAMS message if active_selection is equal to
1422 * requested_selection, else NULL.
1423 * Must be called with LOCK taken */
1425 is_selection_done (GstDecodebin3 * dbin)
1430 if (!dbin->selection_updated)
1433 GST_LOG_OBJECT (dbin, "Checking");
1435 if (dbin->to_activate != NULL) {
1436 GST_DEBUG ("Still have streams to activate");
1439 for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1440 GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1441 if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1442 GST_DEBUG ("Not in active selection, returning");
1447 GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1449 /* We are completely active */
1450 msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1451 GST_MESSAGE_SEQNUM (msg) = dbin->select_streams_seqnum;
1452 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1453 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1455 GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1456 gst_stream_get_stream_id (output->slot->active_stream));
1458 gst_message_streams_selected_add (msg, output->slot->active_stream);
1460 GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1462 dbin->selection_updated = FALSE;
1466 static GstPadProbeReturn
1467 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1468 MultiQueueSlot * slot)
1470 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1471 GstDecodebin3 *dbin = slot->dbin;
1473 if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1474 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1476 GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1477 switch (GST_EVENT_TYPE (ev)) {
1478 case GST_EVENT_STREAM_START:
1480 GstStream *stream = NULL;
1481 const gchar *stream_id;
1483 gst_event_parse_stream (ev, &stream);
1484 if (stream == NULL) {
1485 GST_ERROR_OBJECT (pad,
1486 "Got a STREAM_START event without a GstStream");
1489 slot->is_drained = FALSE;
1490 stream_id = gst_stream_get_stream_id (stream);
1491 GST_DEBUG_OBJECT (pad, "Stream Start '%s'", stream_id);
1492 if (slot->active_stream == NULL) {
1493 slot->active_stream = stream;
1494 } else if (slot->active_stream != stream) {
1495 GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1496 gst_stream_get_stream_id (slot->active_stream),
1497 gst_stream_get_stream_id (stream));
1498 gst_object_unref (slot->active_stream);
1499 slot->active_stream = stream;
1501 gst_object_unref (stream);
1502 #if 0 /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1504 gboolean is_active, is_requested;
1505 /* Quick check to see if we're in the current selection */
1506 /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
1507 SELECTION_LOCK (dbin);
1508 GST_DEBUG_OBJECT (dbin, "Checking active selection");
1509 is_active = stream_in_list (dbin->active_selection, stream_id);
1510 GST_DEBUG_OBJECT (dbin, "Checking requested selection");
1511 is_requested = stream_in_list (dbin->requested_selection, stream_id);
1512 SELECTION_UNLOCK (dbin);
1514 GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
1517 GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
1519 else if (slot->output) {
1520 GST_DEBUG_OBJECT (pad,
1521 "Slot needs to be deactivated ? It's no longer in requested selection");
1522 } else if (!is_active)
1523 GST_DEBUG_OBJECT (pad,
1524 "Slot in neither active nor requested selection");
1529 case GST_EVENT_CAPS:
1531 /* Configure the output slot if needed */
1532 DecodebinOutputStream *output;
1533 GstMessage *msg = NULL;
1534 SELECTION_LOCK (dbin);
1535 output = get_output_for_slot (slot);
1537 reconfigure_output_stream (output, slot);
1538 msg = is_selection_done (dbin);
1540 SELECTION_UNLOCK (dbin);
1542 gst_element_post_message ((GstElement *) slot->dbin, msg);
1546 /* FIXME : Figure out */
1547 GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
1549 slot->is_drained = TRUE;
1550 if (slot->input == NULL) {
1552 GST_DEBUG_OBJECT (pad,
1553 "last EOS for input, forwarding and removing slot");
1554 peer = gst_pad_get_peer (pad);
1556 gst_pad_send_event (peer, ev);
1557 gst_object_unref (peer);
1559 gst_event_unref (ev);
1561 SELECTION_LOCK (dbin);
1562 /* FIXME : Shouldn't we try to re-assign the output instead of just
1564 /* Remove the output */
1566 DecodebinOutputStream *output = slot->output;
1567 dbin->output_streams = g_list_remove (dbin->output_streams, output);
1568 free_output_stream (dbin, output);
1570 dbin->slots = g_list_remove (dbin->slots, slot);
1571 free_multiqueue_slot_async (dbin, slot);
1572 SELECTION_UNLOCK (dbin);
1573 ret = GST_PAD_PROBE_HANDLED;
1576 case GST_EVENT_CUSTOM_DOWNSTREAM:
1577 if (gst_event_has_name (ev, "decodebin3-custom-eos")) {
1578 slot->is_drained = TRUE;
1579 SELECTION_LOCK (dbin);
1580 if (slot->input == NULL) {
1581 GST_DEBUG_OBJECT (pad,
1582 "Got custom-eos from null input stream, remove output stream");
1583 /* Remove the output */
1585 DecodebinOutputStream *output = slot->output;
1586 dbin->output_streams =
1587 g_list_remove (dbin->output_streams, output);
1588 free_output_stream (dbin, output);
1590 dbin->slots = g_list_remove (dbin->slots, slot);
1591 free_multiqueue_slot_async (dbin, slot);
1593 SELECTION_UNLOCK (dbin);
1594 ret = GST_PAD_PROBE_DROP;
1600 } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
1601 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
1602 switch (GST_QUERY_TYPE (query)) {
1603 case GST_QUERY_CAPS:
1605 GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
1606 gst_query_set_caps_result (query, GST_CAPS_ANY);
1607 ret = GST_PAD_PROBE_HANDLED;
1611 case GST_QUERY_ACCEPT_CAPS:
1613 GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
1614 /* If the current decoder doesn't accept caps, we'll reconfigure
1615 * on the actual caps event. So accept any caps. */
1616 gst_query_set_accept_caps_result (query, TRUE);
1617 ret = GST_PAD_PROBE_HANDLED;
1627 /* Create a new multiqueue slot for the given type
1629 * It is up to the caller to know whether that slot is needed or not
1630 * (and release it when no longer needed) */
1631 static MultiQueueSlot *
1632 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
1634 MultiQueueSlot *slot;
1635 GstIterator *it = NULL;
1636 GValue item = { 0, };
1638 GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
1639 gst_stream_type_get_name (type));
1640 slot = g_new0 (MultiQueueSlot, 1);
1642 slot->id = dbin->slot_id++;
1644 slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
1645 if (slot->sink_pad == NULL)
1647 it = gst_pad_iterate_internal_links (slot->sink_pad);
1648 if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
1649 || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
1650 GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
1651 GST_DEBUG_PAD_NAME (slot->src_pad));
1654 gst_iterator_free (it);
1655 g_value_reset (&item);
1657 g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
1659 /* Add event probe */
1661 gst_pad_add_probe (slot->src_pad,
1662 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
1663 (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
1665 GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
1666 GST_DEBUG_PAD_NAME (slot->src_pad));
1667 dbin->slots = g_list_append (dbin->slots, slot);
1674 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
1680 /* Must be called with SELECTION_LOCK */
1681 static MultiQueueSlot *
1682 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
1685 MultiQueueSlot *empty_slot = NULL;
1686 GstStreamType input_type = 0;
1687 gchar *stream_id = NULL;
1689 GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
1690 input, input->active_stream,
1692 active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
1694 if (input->active_stream) {
1695 input_type = gst_stream_get_stream_type (input->active_stream);
1696 stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
1699 /* Go over existing slots and check if there is already one for it */
1700 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1701 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1702 /* Already used input, return that one */
1703 if (slot->input == input) {
1704 GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
1709 /* Go amongst all unused slots of the right type and try to find a candidate */
1710 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1711 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1712 if (slot->input == NULL && input_type == slot->type) {
1713 /* Remember this empty slot for later */
1715 /* Check if available slot is of the same stream_id */
1716 GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
1717 slot->id, slot->active_stream);
1718 if (stream_id && slot->active_stream) {
1720 (gchar *) gst_stream_get_stream_id (slot->active_stream);
1721 GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
1722 ostream_id, stream_id);
1723 if (!g_strcmp0 (stream_id, ostream_id))
1730 GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
1731 empty_slot->input = input;
1736 return create_new_slot (dbin, input_type);
1742 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
1745 if (slot->input != NULL && slot->input != input) {
1746 GST_ERROR_OBJECT (slot->dbin,
1747 "Trying to link input to an already used slot");
1750 gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
1751 slot->pending_stream = input->active_stream;
1752 slot->input = input;
1753 event = gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1755 gst_pad_send_event (slot->sink_pad, event);
1760 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
1761 GstElementFactoryListType ftype)
1763 gboolean ret = FALSE;
1766 g_mutex_lock (&dbin->factories_lock);
1767 gst_decode_bin_update_factories_list (dbin);
1768 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1770 gst_element_factory_list_filter (dbin->decoder_factories,
1771 caps, GST_PAD_SINK, TRUE);
1774 gst_element_factory_list_filter (dbin->decodable_factories,
1775 caps, GST_PAD_SINK, TRUE);
1776 g_mutex_unlock (&dbin->factories_lock);
1780 gst_plugin_feature_list_free (res);
1788 create_element (GstDecodebin3 * dbin, GstStream * stream,
1789 GstElementFactoryListType ftype)
1792 GstElement *element = NULL;
1795 g_mutex_lock (&dbin->factories_lock);
1796 gst_decode_bin_update_factories_list (dbin);
1797 caps = gst_stream_get_caps (stream);
1798 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1800 gst_element_factory_list_filter (dbin->decoder_factories,
1801 caps, GST_PAD_SINK, TRUE);
1804 gst_element_factory_list_filter (dbin->decodable_factories,
1805 caps, GST_PAD_SINK, TRUE);
1806 g_mutex_unlock (&dbin->factories_lock);
1810 gst_element_factory_create ((GstElementFactory *) res->data, NULL);
1811 GST_DEBUG ("Created element '%s'", GST_ELEMENT_NAME (element));
1812 gst_plugin_feature_list_free (res);
1814 GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT, caps);
1817 gst_caps_unref (caps);
1821 /* FIXME : VERY NAIVE. ASSUMING FIRST ONE WILL WORK */
1823 create_decoder (GstDecodebin3 * dbin, GstStream * stream)
1825 return create_element (dbin, stream, GST_ELEMENT_FACTORY_TYPE_DECODER);
1828 static GstPadProbeReturn
1829 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
1830 DecodebinOutputStream * output)
1832 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
1833 /* If we have a keyframe, remove the probe and let all data through */
1834 /* FIXME : HANDLE HEADER BUFFER ?? */
1835 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1836 GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
1837 GST_DEBUG_OBJECT (pad,
1838 "Buffer is keyframe or header, letting through and removing probe");
1839 output->drop_probe_id = 0;
1840 return GST_PAD_PROBE_REMOVE;
1842 GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
1843 return GST_PAD_PROBE_DROP;
1847 reconfigure_output_stream (DecodebinOutputStream * output,
1848 MultiQueueSlot * slot)
1850 GstDecodebin3 *dbin = output->dbin;
1851 GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
1852 gboolean needs_decoder;
1854 needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
1856 GST_DEBUG_OBJECT (dbin,
1857 "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
1860 /* FIXME : Maybe make the output un-hook itself automatically ? */
1861 if (output->slot != NULL && output->slot != slot) {
1862 GST_WARNING_OBJECT (dbin,
1863 "Output still linked to another slot (%p)", output->slot);
1864 gst_caps_unref (new_caps);
1868 /* Check if existing config is reusable as-is by checking if
1869 * the existing decoder accepts the new caps, if not delete
1870 * it and create a new one */
1871 if (output->decoder) {
1872 gboolean can_reuse_decoder;
1874 if (needs_decoder) {
1876 gst_pad_query_accept_caps (output->decoder_sink, new_caps);
1878 can_reuse_decoder = FALSE;
1880 if (can_reuse_decoder) {
1881 if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
1882 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
1883 output->drop_probe_id =
1884 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
1885 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
1887 GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
1888 if (output->linked == FALSE) {
1889 gst_pad_link_full (slot->src_pad, output->decoder_sink,
1890 GST_PAD_LINK_CHECK_NOTHING);
1891 output->linked = TRUE;
1893 gst_caps_unref (new_caps);
1897 GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
1900 gst_pad_unlink (slot->src_pad, output->decoder_sink);
1901 output->linked = FALSE;
1902 if (output->drop_probe_id) {
1903 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
1904 output->drop_probe_id = 0;
1907 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
1908 GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
1909 gst_caps_unref (new_caps);
1913 gst_element_set_locked_state (output->decoder, TRUE);
1914 gst_element_set_state (output->decoder, GST_STATE_NULL);
1916 gst_bin_remove ((GstBin *) dbin, output->decoder);
1917 output->decoder = NULL;
1920 gst_caps_unref (new_caps);
1922 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
1923 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
1925 /* If a decoder is required, create one */
1926 if (needs_decoder) {
1927 /* If we don't have a decoder yet, instantiate one */
1928 output->decoder = create_decoder (dbin, slot->active_stream);
1929 if (output->decoder == NULL) {
1932 SELECTION_UNLOCK (dbin);
1933 /* FIXME : Should we be smarter if there's a missing decoder ?
1934 * Should we deactivate that stream ? */
1935 caps = gst_stream_get_caps (slot->active_stream);
1936 gst_element_post_message (GST_ELEMENT_CAST (dbin),
1937 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1938 gst_caps_unref (caps);
1939 SELECTION_LOCK (dbin);
1942 if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
1943 GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
1946 output->decoder_sink = gst_element_get_static_pad (output->decoder, "sink");
1947 output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
1948 if (output->type & GST_STREAM_TYPE_VIDEO) {
1949 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
1950 output->drop_probe_id =
1951 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
1952 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
1954 if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
1955 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
1956 GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
1957 GST_DEBUG_PAD_NAME (output->decoder_sink));
1961 output->decoder_src = gst_object_ref (slot->src_pad);
1962 output->decoder_sink = NULL;
1964 output->linked = TRUE;
1965 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
1966 output->decoder_src)) {
1967 GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
1970 if (output->src_exposed == FALSE) {
1971 output->src_exposed = TRUE;
1972 gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
1975 if (output->decoder)
1976 gst_element_sync_state_with_parent (output->decoder);
1978 output->slot = slot;
1983 GST_DEBUG_OBJECT (dbin, "Cleanup");
1984 if (output->decoder_sink) {
1985 gst_object_unref (output->decoder_sink);
1986 output->decoder_sink = NULL;
1988 if (output->decoder_src) {
1989 gst_object_unref (output->decoder_src);
1990 output->decoder_src = NULL;
1992 if (output->decoder) {
1993 gst_element_set_state (output->decoder, GST_STATE_NULL);
1994 gst_bin_remove ((GstBin *) dbin, output->decoder);
1995 output->decoder = NULL;
2000 static GstPadProbeReturn
2001 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2003 GstMessage *msg = NULL;
2004 DecodebinOutputStream *output;
2006 SELECTION_LOCK (slot->dbin);
2007 output = get_output_for_slot (slot);
2009 GST_DEBUG_OBJECT (pad, "output : %p", output);
2012 reconfigure_output_stream (output, slot);
2013 msg = is_selection_done (slot->dbin);
2015 SELECTION_UNLOCK (slot->dbin);
2017 gst_element_post_message ((GstElement *) slot->dbin, msg);
2019 return GST_PAD_PROBE_REMOVE;
2022 static MultiQueueSlot *
2023 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2027 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2028 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2029 const gchar *stream_id;
2030 if (slot->active_stream) {
2031 stream_id = gst_stream_get_stream_id (slot->active_stream);
2032 if (!g_strcmp0 (sid, stream_id))
2035 if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2036 stream_id = gst_stream_get_stream_id (slot->pending_stream);
2037 if (!g_strcmp0 (sid, stream_id))
2045 /* This function handles the reassignment of a slot. Call this from
2046 * the streaming thread of a slot. */
2048 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2050 DecodebinOutputStream *output;
2051 MultiQueueSlot *target_slot = NULL;
2053 const gchar *sid, *tsid;
2055 SELECTION_LOCK (dbin);
2056 output = slot->output;
2058 if (G_UNLIKELY (slot->active_stream == NULL)) {
2059 GST_DEBUG_OBJECT (slot->src_pad,
2060 "Called on inactive slot (active_stream == NULL)");
2061 SELECTION_UNLOCK (dbin);
2065 if (G_UNLIKELY (output == NULL)) {
2066 GST_DEBUG_OBJECT (slot->src_pad,
2067 "Slot doesn't have any output to be removed");
2068 SELECTION_UNLOCK (dbin);
2072 sid = gst_stream_get_stream_id (slot->active_stream);
2073 GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2075 /* Recheck whether this stream is still in the list of streams to deactivate */
2076 if (stream_in_list (dbin->requested_selection, sid)) {
2077 /* Stream is in the list of requested streams, don't remove */
2078 SELECTION_UNLOCK (dbin);
2079 GST_DEBUG_OBJECT (slot->src_pad,
2080 "Stream '%s' doesn't need to be deactivated", sid);
2084 /* Unlink slot from output */
2085 /* FIXME : Handle flushing ? */
2086 /* FIXME : Handle outputs without decoders */
2087 GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2088 output->decoder_sink);
2089 if (output->decoder_sink)
2090 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2091 output->linked = FALSE;
2092 slot->output = NULL;
2093 output->slot = NULL;
2094 /* Remove sid from active selection */
2095 for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2096 if (!g_strcmp0 (sid, tmp->data)) {
2097 dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2101 /* Can we re-assign this output to a requested stream ? */
2102 GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2103 for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2104 MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2105 GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2106 tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2107 if (tslot && tslot->type == output->type && tslot->output == NULL) {
2108 GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2109 target_slot = tslot;
2111 /* Pass target stream id to requested selection */
2112 dbin->requested_selection =
2113 g_list_append (dbin->requested_selection, tmp->data);
2114 dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data);
2120 GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2122 target_slot->output = output;
2123 output->slot = target_slot;
2124 dbin->active_selection =
2125 g_list_append (dbin->active_selection, (gchar *) tsid);
2126 SELECTION_UNLOCK (dbin);
2128 /* Wakeup the target slot so that it retries to send events/buffers
2129 * thereby triggering the output reconfiguration codepath */
2130 gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2131 (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2132 /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2136 dbin->output_streams = g_list_remove (dbin->output_streams, output);
2137 free_output_stream (dbin, output);
2138 msg = is_selection_done (slot->dbin);
2139 SELECTION_UNLOCK (dbin);
2142 gst_element_post_message ((GstElement *) slot->dbin, msg);
2148 /* Idle probe called when a slot should be unassigned from its output stream.
2149 * This is needed to ensure nothing is flowing when unlinking the slot.
2151 * Also, this method will search for a pending stream which could re-use
2152 * the output stream. */
2153 static GstPadProbeReturn
2154 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2155 MultiQueueSlot * slot)
2157 GstDecodebin3 *dbin = slot->dbin;
2159 reassign_slot (dbin, slot);
2161 return GST_PAD_PROBE_REMOVE;
2165 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2168 gboolean ret = TRUE;
2170 /* List of slots to (de)activate. */
2171 GList *to_deactivate = NULL;
2172 GList *to_activate = NULL;
2173 /* List of unknown stream id, most likely means the event
2174 * should be sent upstream so that elements can expose the requested stream */
2175 GList *unknown = NULL;
2176 GList *to_reassign = NULL;
2177 GList *future_request_streams = NULL;
2178 GList *pending_streams = NULL;
2179 GList *slots_to_reassign = NULL;
2181 SELECTION_LOCK (dbin);
2182 if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2183 GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2184 SELECTION_UNLOCK (dbin);
2187 /* Remove pending select_streams */
2188 g_list_free (dbin->pending_select_streams);
2189 dbin->pending_select_streams = NULL;
2191 /* COMPARE the requested streams to the active and requested streams
2194 /* First check the slots to activate and which ones are unknown */
2195 for (tmp = select_streams; tmp; tmp = tmp->next) {
2196 const gchar *sid = (const gchar *) tmp->data;
2197 MultiQueueSlot *slot;
2198 GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2199 slot = find_slot_for_stream_id (dbin, sid);
2200 /* Find the corresponding slot */
2202 if (stream_in_collection (dbin, (gchar *) sid)) {
2203 pending_streams = g_list_append (pending_streams, (gchar *) sid);
2205 GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2206 unknown = g_list_append (unknown, (gchar *) sid);
2208 } else if (slot->output == NULL) {
2209 GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2211 to_activate = g_list_append (to_activate, slot);
2213 GST_DEBUG_OBJECT (dbin,
2214 "Stream '%s' from slot %p is already active on output %p", sid, slot,
2216 future_request_streams =
2217 g_list_append (future_request_streams, (gchar *) sid);
2221 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2222 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2223 /* For slots that have an output, check if it's part of the streams to
2226 gboolean slot_to_deactivate = TRUE;
2228 if (slot->active_stream) {
2229 if (stream_in_list (select_streams,
2230 gst_stream_get_stream_id (slot->active_stream)))
2231 slot_to_deactivate = FALSE;
2233 if (slot_to_deactivate && slot->pending_stream
2234 && slot->pending_stream != slot->active_stream) {
2235 if (stream_in_list (select_streams,
2236 gst_stream_get_stream_id (slot->pending_stream)))
2237 slot_to_deactivate = FALSE;
2239 if (slot_to_deactivate) {
2240 GST_DEBUG_OBJECT (dbin,
2241 "Slot %p (%s) should be deactivated, no longer used", slot,
2242 gst_stream_get_stream_id (slot->active_stream));
2243 to_deactivate = g_list_append (to_deactivate, slot);
2248 if (to_deactivate != NULL) {
2249 GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2250 /* We need to compare what needs to be activated and deactivated in order
2251 * to determine whether there are outputs that can be transferred */
2252 /* Take the stream-id of the slots that are to be activated, for which there
2253 * is a slot of the same type that needs to be deactivated */
2254 tmp = to_deactivate;
2256 MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2257 gboolean removeit = FALSE;
2259 GST_DEBUG_OBJECT (dbin,
2260 "Checking if slot to deactivate (%p) has a candidate slot to activate",
2261 slot_to_deactivate);
2262 for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2263 MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2264 GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2265 if (slot_to_activate->type == slot_to_deactivate->type) {
2266 GST_DEBUG_OBJECT (dbin, "Re-using");
2267 to_reassign = g_list_append (to_reassign, (gchar *)
2268 gst_stream_get_stream_id (slot_to_activate->active_stream));
2270 g_list_append (slots_to_reassign, slot_to_deactivate);
2271 to_activate = g_list_remove (to_activate, slot_to_activate);
2278 to_deactivate = g_list_delete_link (to_deactivate, tmp);
2283 for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2284 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2285 GST_DEBUG_OBJECT (dbin,
2286 "Really need to deactivate slot %p, but no available alternative",
2289 slots_to_reassign = g_list_append (slots_to_reassign, slot);
2292 /* The only slots left to activate are the ones that won't be reassigned and
2293 * therefore really need to have a new output created */
2294 for (tmp = to_activate; tmp; tmp = tmp->next) {
2295 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2296 if (slot->active_stream)
2297 future_request_streams =
2298 g_list_append (future_request_streams,
2299 (gchar *) gst_stream_get_stream_id (slot->active_stream));
2300 else if (slot->pending_stream)
2301 future_request_streams =
2302 g_list_append (future_request_streams,
2303 (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2305 GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2308 if (to_activate == NULL && pending_streams != NULL) {
2309 GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2310 if (dbin->requested_selection)
2311 g_list_free (dbin->requested_selection);
2312 dbin->requested_selection = select_streams;
2313 g_list_free (to_deactivate);
2314 g_list_free (pending_streams);
2315 to_deactivate = NULL;
2317 if (dbin->requested_selection)
2318 g_list_free (dbin->requested_selection);
2319 dbin->requested_selection = future_request_streams;
2320 dbin->requested_selection =
2321 g_list_concat (dbin->requested_selection, pending_streams);
2322 if (dbin->to_activate)
2323 g_list_free (dbin->to_activate);
2324 dbin->to_activate = to_reassign;
2327 dbin->selection_updated = TRUE;
2328 SELECTION_UNLOCK (dbin);
2331 GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2333 /* For all streams to deactivate, add an idle probe where we will do
2334 * the unassignment and switch over */
2335 for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2336 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2337 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2338 (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2344 static GstPadProbeReturn
2345 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2346 DecodebinOutputStream * output)
2348 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2349 GstDecodebin3 *dbin = output->dbin;
2350 GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2352 GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2354 switch (GST_EVENT_TYPE (event)) {
2355 case GST_EVENT_SELECT_STREAMS:
2358 GList *streams = NULL;
2359 guint32 seqnum = gst_event_get_seqnum (event);
2361 SELECTION_LOCK (dbin);
2362 if (seqnum == dbin->select_streams_seqnum) {
2363 SELECTION_UNLOCK (dbin);
2364 GST_DEBUG_OBJECT (pad,
2365 "Already handled/handling that SELECT_STREAMS event");
2368 dbin->select_streams_seqnum = seqnum;
2369 if (dbin->pending_select_streams != NULL) {
2370 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2371 g_list_free (dbin->pending_select_streams);
2372 dbin->pending_select_streams = NULL;
2374 gst_event_parse_select_streams (event, &streams);
2375 dbin->pending_select_streams = g_list_copy (streams);
2376 SELECTION_UNLOCK (dbin);
2378 /* Send event upstream */
2379 if ((peer = gst_pad_get_peer (pad))) {
2380 gst_pad_send_event (peer, event);
2381 gst_object_unref (peer);
2383 gst_event_unref (event);
2385 /* Finally handle the switch */
2387 handle_stream_switch (dbin, streams, seqnum);
2388 ret = GST_PAD_PROBE_HANDLED;
2399 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
2401 GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
2402 if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
2403 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2404 GList *streams = NULL;
2405 guint32 seqnum = gst_event_get_seqnum (event);
2407 SELECTION_LOCK (dbin);
2408 if (seqnum == dbin->select_streams_seqnum) {
2409 SELECTION_UNLOCK (dbin);
2410 GST_DEBUG_OBJECT (dbin,
2411 "Already handled/handling that SELECT_STREAMS event");
2414 dbin->select_streams_seqnum = seqnum;
2415 if (dbin->pending_select_streams != NULL) {
2416 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2417 g_list_free (dbin->pending_select_streams);
2418 dbin->pending_select_streams = NULL;
2420 gst_event_parse_select_streams (event, &streams);
2421 dbin->pending_select_streams = g_list_copy (streams);
2422 SELECTION_UNLOCK (dbin);
2424 /* FIXME : We don't have an upstream ?? */
2426 /* Send event upstream */
2427 if ((peer = gst_pad_get_peer (pad))) {
2428 gst_pad_send_event (peer, event);
2429 gst_object_unref (peer);
2432 /* Finally handle the switch */
2434 handle_stream_switch (dbin, streams, seqnum);
2436 gst_event_unref (event);
2439 return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
2444 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2447 gst_pad_remove_probe (slot->src_pad, slot->probe_id);
2449 if (slot->input->srcpad)
2450 gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
2453 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2454 gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
2455 gst_object_replace ((GstObject **) & slot->src_pad, NULL);
2456 gst_object_replace ((GstObject **) & slot->active_stream, NULL);
2461 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2463 GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
2464 gst_element_call_async (GST_ELEMENT_CAST (dbin),
2465 (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
2468 /* Create a DecodebinOutputStream for a given type
2469 * Note: It will be empty initially, it needs to be configured
2471 static DecodebinOutputStream *
2472 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
2474 DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
2476 const gchar *prefix;
2477 GstStaticPadTemplate *templ;
2478 GstPadTemplate *ptmpl;
2480 GstPad *internal_pad;
2482 GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
2483 res, gst_stream_type_get_name (type));
2488 if (type & GST_STREAM_TYPE_VIDEO) {
2489 templ = &video_src_template;
2490 counter = &dbin->vpadcount;
2492 } else if (type & GST_STREAM_TYPE_AUDIO) {
2493 templ = &audio_src_template;
2494 counter = &dbin->apadcount;
2496 } else if (type & GST_STREAM_TYPE_TEXT) {
2497 templ = &text_src_template;
2498 counter = &dbin->tpadcount;
2501 templ = &src_template;
2502 counter = &dbin->opadcount;
2506 pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
2508 ptmpl = gst_static_pad_template_get (templ);
2509 res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
2510 gst_object_unref (ptmpl);
2512 gst_pad_set_active (res->src_pad, TRUE);
2513 /* Put an event probe on the internal proxy pad to detect upstream
2516 (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
2517 gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
2518 (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
2519 gst_object_unref (internal_pad);
2521 dbin->output_streams = g_list_append (dbin->output_streams, res);
2527 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
2530 if (output->decoder_sink && output->decoder)
2531 gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
2533 output->slot->output = NULL;
2534 output->slot = NULL;
2536 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2537 gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
2538 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2539 if (output->src_exposed) {
2540 gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
2542 if (output->decoder) {
2543 gst_element_set_locked_state (output->decoder, TRUE);
2544 gst_element_set_state (output->decoder, GST_STATE_NULL);
2545 gst_bin_remove ((GstBin *) dbin, output->decoder);
2550 static GstStateChangeReturn
2551 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
2553 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2554 GstStateChangeReturn ret;
2557 switch (transition) {
2561 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2562 if (ret == GST_STATE_CHANGE_FAILURE)
2565 switch (transition) {
2566 case GST_STATE_CHANGE_PAUSED_TO_READY:
2570 /* Free output streams */
2571 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2572 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2573 free_output_stream (dbin, output);
2575 g_list_free (dbin->output_streams);
2576 dbin->output_streams = NULL;
2577 /* Free multiqueue slots */
2578 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2579 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2580 free_multiqueue_slot (dbin, slot);
2582 g_list_free (dbin->slots);
2595 gst_decodebin3_plugin_init (GstPlugin * plugin)
2597 GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");
2599 return gst_element_register (plugin, "decodebin3", GST_RANK_NONE,
2600 GST_TYPE_DECODEBIN3);