3 * Copyright (C) <2015> Centricular Ltd
4 * @author: Edward Hervey <edward@centricular.com>
5 * @author: Jan Schmidt <jan@centricular.com>
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
31 #include <gst/pbutils/pbutils.h>
33 #include "gstplayback.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
41 * 1) From sink pad to elementary streams (GstParseBin)
43 * The input sink pads are fed to GstParseBin. GstParseBin will feed them
44 * through typefind. When the caps are detected (or changed) we recursively
45 * figure out which demuxer, parser or depayloader is needed until we get to
48 * All elementary streams (whether decoded or not, whether exposed or not) are
49 * fed through multiqueue. There is only *one* multiqueue in decodebin3.
51 * => MultiQueue is the cornerstone.
52 * => No buffering before multiqueue
54 * 2) Elementary streams
56 * After GstParseBin, there are 3 main components:
57 * 1) Input Streams (provided by GstParseBin)
61 * Input Streams correspond to the stream coming from GstParseBin and that gets
62 * fed into a multiqueue slot.
64 * Output Streams correspond to the combination of a (optional) decoder and an
65 * output ghostpad. Output Streams can be moved from one multiqueue slot to
66 * another, can reconfigure itself (different decoders), and can be
67 * added/removed depending on the configuration (all streams outputted, only one
70 * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
71 * each 'active' Input Stream there is a corresponding slot.
72 * Slots might have different streams on input and output (due to internal
75 * Due to internal queuing/buffering/..., all those components (might) behave
76 * asynchronously. Therefore probes will be used on each component source pad to
77 * detect various key-points:
79 * the stream is done => Mark that component as done, optionally freeing/removing it
81 * a new stream is starting => link it further if needed
84 * 3) Gradual replacement
86 * If the caps change at any point in decodebin (input sink pad, demuxer output,
87 * multiqueue output, ..), we gradually replace (if needed) the following elements.
89 * This is handled by the probes in various locations:
91 * b) multiqueue input (source pad of Input Streams)
92 * c) multiqueue output (source pad of Multiqueue Slots)
93 * d) final output (target of source ghostpads)
95 * When CAPS event arrive at those points, one of three things can happen:
96 * a) There is no elements downstream yet, just create/link-to following elements
97 * b) There are downstream elements, do a ACCEPT_CAPS query
98 * b.1) The new CAPS are accepted, keep current configuration
99 * b.2) The new CAPS are not accepted, remove following elements then do a)
106 * Input(s) Slots Streams
107 * /-------------------------------------------\ /-----\ /------------- \
109 * +-------------------------------------------------------------------------+
111 * | +---------------------------------------------+ |
112 * | | GstParseBin(s) | |
113 * | | +--------------+ | +-----+ |
114 * | | | |---[parser]-[|--| Mul |---[ decoder ]-[|
115 * |]--[ typefind ]---| demuxer(s) |------------[| | ti | |
116 * | | | (if needed) |---[parser]-[|--| qu | |
117 * | | | |---[parser]-[|--| eu |---[ decoder ]-[|
118 * | | +--------------+ | +------ ^ |
119 * | +---------------------------------------------+ ^ | |
121 * +-----------------------------------------------+--------+-------------+--+
124 * Probes --/--------/-------------/
128 * We want to ensure we re-use decoders when switching streams. This takes place
129 * at the multiqueue output level.
132 * 1) Activating a stream (i.e. linking a slot to an output) is only done within
133 * the streaming thread in the multiqueue_src_probe() and only if the
134 stream is in the REQUESTED selection.
135 * 2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
136 * within the stream thread, but only in a purposefully called IDLE probe
137 * that calls reassign_slot().
139 * Based on those two principles, 3 "selection" of streams (stream-id) are used:
140 * 1) requested_selection
141 * All streams within that list should be activated
142 * 2) active_selection
143 * List of streams that are exposed by decodebin
145 * List of streams that will be moved to requested_selection in the
146 * reassign_slot() method (i.e. once a stream was deactivated, and the output
151 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
152 #define GST_CAT_DEFAULT decodebin3_debug
154 #define GST_TYPE_DECODEBIN3 (gst_decodebin3_get_type ())
156 #define EXTRA_DEBUG 1
158 typedef struct _GstDecodebin3 GstDecodebin3;
159 typedef struct _GstDecodebin3Class GstDecodebin3Class;
161 typedef struct _DecodebinInputStream DecodebinInputStream;
162 typedef struct _DecodebinInput DecodebinInput;
163 typedef struct _DecodebinOutputStream DecodebinOutputStream;
165 struct _GstDecodebin3
169 /* input_lock protects the following variables */
171 /* Main input (static sink pad) */
172 DecodebinInput *main_input;
173 /* Supplementary input (request sink pads) */
175 /* counter for input */
176 guint32 input_counter;
177 /* Current stream group_id (default : G_MAXUINT32) */
178 /* FIXME : Needs to be resetted appropriately (when upstream changes ?) */
179 guint32 current_group_id;
180 /* End of variables protected by input_lock */
182 GstElement *multiqueue;
184 /* FIXME : Mutex for protecting values below */
185 GstStreamCollection *collection; /* Active collection */
187 GList *input_streams; /* List of DecodebinInputStream for active collection */
188 GList *output_streams; /* List of DecodebinOutputStream used for output */
189 GList *slots; /* List of MultiQueueSlot */
192 /* selection_lock protects access to following variables */
193 GMutex selection_lock;
194 /* requested selection of stream-id to activate post-multiqueue */
195 GList *requested_selection;
196 /* list of stream-id currently activated in output */
197 GList *active_selection;
198 /* List of stream-id that need to be activated (after a stream switch for ex) */
200 /* Pending select streams event */
201 guint32 select_streams_seqnum;
202 /* pending list of streams to select (from downstream) */
203 GList *pending_select_streams;
204 /* TRUE if requested_selection was updated, will become FALSE once
205 * it has fully transitioned to active */
206 gboolean selection_updated;
207 /* End of variables protected by selection_lock */
209 /* List of pending collections.
210 * FIXME : Is this really needed ? */
211 GList *pending_collection;
215 GMutex factories_lock;
216 guint32 factories_cookie;
217 /* All DECODABLE factories */
219 /* Only DECODER factories */
220 GList *decoder_factories;
221 /* DECODABLE but not DECODER factories */
222 GList *decodable_factories;
224 /* counters for pads */
225 guint32 apadcount, vpadcount, tpadcount, opadcount;
231 struct _GstDecodebin3Class
235 gint (*select_stream) (GstDecodebin3 * dbin,
236 GstStreamCollection * collection, GstStream * stream);
239 /* Input of decodebin, controls input pad and parsebin */
240 struct _DecodebinInput
247 GstPad *parsebin_sink;
249 GstStreamCollection *collection; /* Active collection */
253 GstElement *parsebin;
255 gulong pad_added_sigid;
256 gulong pad_removed_sigid;
258 /* HACK : Remove these fields */
259 /* List of PendingPad structures */
263 /* Multiqueue Slots */
264 typedef struct _MultiQueueSlot
269 /* Type of stream handled by this slot */
272 /* Linked input and output */
273 DecodebinInputStream *input;
275 /* pending => last stream received on sink pad */
276 GstStream *pending_stream;
277 /* active => last stream outputted on source pad */
278 GstStream *active_stream;
280 GstPad *sink_pad, *src_pad;
282 /* id of the MQ src_pad event probe */
287 DecodebinOutputStream *output;
290 /* Streams that are exposed downstream (i.e. output) */
291 struct _DecodebinOutputStream
294 /* The type of stream handled by this output stream */
297 /* The slot to which this output stream is currently connected to */
298 MultiQueueSlot *slot;
300 GstElement *decoder; /* Optional */
301 GstPad *decoder_sink, *decoder_src;
306 /* Flag if ghost pad is exposed */
307 gboolean src_exposed;
309 /* keyframe dropping probe */
310 gulong drop_probe_id;
313 /* Pending pads from parsebin */
314 typedef struct _PendingPad
317 DecodebinInput *input;
326 #define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
337 SIGNAL_SELECT_STREAM,
340 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
342 #define SELECTION_LOCK(dbin) G_STMT_START { \
343 GST_LOG_OBJECT (dbin, \
344 "selection locking from thread %p", \
346 g_mutex_lock (&dbin->selection_lock); \
347 GST_LOG_OBJECT (dbin, \
348 "selection locked from thread %p", \
352 #define SELECTION_UNLOCK(dbin) G_STMT_START { \
353 GST_LOG_OBJECT (dbin, \
354 "selection unlocking from thread %p", \
356 g_mutex_unlock (&dbin->selection_lock); \
359 #define INPUT_LOCK(dbin) G_STMT_START { \
360 GST_LOG_OBJECT (dbin, \
361 "input locking from thread %p", \
363 g_mutex_lock (&dbin->input_lock); \
364 GST_LOG_OBJECT (dbin, \
365 "input locked from thread %p", \
369 #define INPUT_UNLOCK(dbin) G_STMT_START { \
370 GST_LOG_OBJECT (dbin, \
371 "input unlocking from thread %p", \
373 g_mutex_unlock (&dbin->input_lock); \
376 GType gst_decodebin3_get_type (void);
377 #define gst_decodebin3_parent_class parent_class
378 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
380 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
382 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
385 GST_STATIC_CAPS_ANY);
387 static GstStaticPadTemplate request_sink_template =
388 GST_STATIC_PAD_TEMPLATE ("sink_%u",
391 GST_STATIC_CAPS_ANY);
393 static GstStaticPadTemplate video_src_template =
394 GST_STATIC_PAD_TEMPLATE ("video_%u",
397 GST_STATIC_CAPS_ANY);
399 static GstStaticPadTemplate audio_src_template =
400 GST_STATIC_PAD_TEMPLATE ("audio_%u",
403 GST_STATIC_CAPS_ANY);
405 static GstStaticPadTemplate text_src_template =
406 GST_STATIC_PAD_TEMPLATE ("text_%u",
409 GST_STATIC_CAPS_ANY);
411 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
414 GST_STATIC_CAPS_ANY);
417 static void gst_decodebin3_dispose (GObject * object);
418 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
419 const GValue * value, GParamSpec * pspec);
420 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
421 GValue * value, GParamSpec * pspec);
423 static gboolean parsebin_autoplug_continue_cb (GstElement *
424 parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
427 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
428 GstStreamCollection * collection, GstStream * stream)
430 GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
435 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
436 GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
437 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
438 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
439 GstStateChange transition);
440 static gboolean gst_decodebin3_send_event (GstElement * element,
443 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
445 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
446 GstElementFactoryListType ftype);
449 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
450 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
451 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
453 static void reconfigure_output_stream (DecodebinOutputStream * output,
454 MultiQueueSlot * slot);
455 static void free_output_stream (GstDecodebin3 * dbin,
456 DecodebinOutputStream * output);
457 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
460 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
461 GstPadProbeInfo * info, MultiQueueSlot * slot);
462 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
463 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
464 DecodebinInputStream * input);
465 static void link_input_to_slot (DecodebinInputStream * input,
466 MultiQueueSlot * slot);
467 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
469 /* FIXME: Really make all the parser stuff a self-contained helper object */
470 #include "gstdecodebin3-parse.c"
473 _gst_int_accumulator (GSignalInvocationHint * ihint,
474 GValue * return_accu, const GValue * handler_return, gpointer dummy)
476 gint res = g_value_get_int (handler_return);
478 if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
479 g_value_set_int (return_accu, res);
488 gst_decodebin3_class_init (GstDecodebin3Class * klass)
490 GObjectClass *gobject_klass = (GObjectClass *) klass;
491 GstElementClass *element_class = (GstElementClass *) klass;
492 GstBinClass *bin_klass = (GstBinClass *) klass;
494 gobject_klass->dispose = gst_decodebin3_dispose;
495 gobject_klass->set_property = gst_decodebin3_set_property;
496 gobject_klass->get_property = gst_decodebin3_get_property;
498 /* FIXME : ADD PROPERTIES ! */
499 g_object_class_install_property (gobject_klass, PROP_CAPS,
500 g_param_spec_boxed ("caps", "Caps",
501 "The caps on which to stop decoding. (NULL = default)",
502 GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
504 /* FIXME : ADD SIGNALS ! */
506 * GstDecodebin3::select-stream
507 * @decodebin: a #GstDecodebin3
508 * @collection: a #GstStreamCollection
509 * @stream: a #GstStream
511 * This signal is emitted whenever @decodebin needs to decide whether
512 * to expose a @stream of a given @collection.
514 * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
515 * A value of -1 (default) lets @decodebin decide what to do with the stream.
517 gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
518 g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
519 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
520 _gst_int_accumulator, NULL, g_cclosure_marshal_generic,
521 G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
524 element_class->request_new_pad =
525 GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
526 element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
527 element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
529 gst_element_class_add_pad_template (element_class,
530 gst_static_pad_template_get (&sink_template));
531 gst_element_class_add_pad_template (element_class,
532 gst_static_pad_template_get (&request_sink_template));
533 gst_element_class_add_pad_template (element_class,
534 gst_static_pad_template_get (&video_src_template));
535 gst_element_class_add_pad_template (element_class,
536 gst_static_pad_template_get (&audio_src_template));
537 gst_element_class_add_pad_template (element_class,
538 gst_static_pad_template_get (&text_src_template));
539 gst_element_class_add_pad_template (element_class,
540 gst_static_pad_template_get (&src_template));
542 gst_element_class_set_static_metadata (element_class,
543 "Decoder Bin 3", "Generic/Bin/Decoder",
544 "Autoplug and decode to raw media",
545 "Edward Hervey <edward@centricular.com>");
547 bin_klass->handle_message = gst_decodebin3_handle_message;
549 klass->select_stream = gst_decodebin3_select_stream;
553 gst_decodebin3_init (GstDecodebin3 * dbin)
555 /* Create main input */
556 dbin->main_input = create_new_input (dbin, TRUE);
558 dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
559 g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
560 "max-size-buffers", 0, "use-interleave", TRUE, NULL);
561 gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
563 dbin->current_group_id = G_MAXUINT32;
565 g_mutex_init (&dbin->factories_lock);
566 g_mutex_init (&dbin->selection_lock);
567 g_mutex_init (&dbin->input_lock);
569 dbin->caps = gst_static_caps_get (&default_raw_caps);
573 gst_decodebin3_dispose (GObject * object)
575 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
578 gst_plugin_feature_list_free (dbin->factories);
579 if (dbin->decoder_factories)
580 g_list_free (dbin->decoder_factories);
581 if (dbin->decodable_factories)
582 g_list_free (dbin->decodable_factories);
583 g_list_free (dbin->requested_selection);
584 g_list_free (dbin->active_selection);
585 g_list_free (dbin->to_activate);
586 g_list_free (dbin->pending_select_streams);
588 free_input (dbin, dbin->main_input);
589 /* FIXME : GO OVER INPUTS */
591 G_OBJECT_CLASS (parent_class)->dispose (object);
595 gst_decodebin3_set_property (GObject * object, guint prop_id,
596 const GValue * value, GParamSpec * pspec)
598 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
600 /* FIXME : IMPLEMENT */
603 GST_OBJECT_LOCK (dbin);
605 gst_caps_unref (dbin->caps);
606 dbin->caps = g_value_dup_boxed (value);
607 GST_OBJECT_UNLOCK (dbin);
610 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
616 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
619 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
621 /* FIXME : IMPLEMENT */
624 GST_OBJECT_LOCK (dbin);
625 g_value_set_boxed (value, dbin->caps);
626 GST_OBJECT_UNLOCK (dbin);
629 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
635 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
636 GstCaps * caps, GstDecodebin3 * dbin)
638 GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
640 /* If it matches our target caps, expose it */
641 if (gst_caps_can_intersect (caps, dbin->caps))
647 /* This method should be called whenever a STREAM_START event
648 * comes out of a given parsebin.
649 * The caller shall replace the group_id if the function returns TRUE */
651 set_input_group_id (DecodebinInput * input, guint32 * group_id)
653 GstDecodebin3 *dbin = input->dbin;
655 if (input->group_id != *group_id) {
656 if (input->group_id != G_MAXUINT32)
657 GST_WARNING_OBJECT (dbin,
658 "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
659 ") on input %p ", input->group_id, *group_id, input);
660 input->group_id = *group_id;
663 if (*group_id != dbin->current_group_id) {
664 if (dbin->current_group_id == G_MAXUINT32) {
665 GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
667 dbin->current_group_id = *group_id;
669 *group_id = dbin->current_group_id;
676 /* Call with INPUT_LOCK taken */
678 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
680 gboolean set_state = FALSE;
682 if (input->parsebin == NULL) {
683 input->parsebin = gst_element_factory_make ("parsebin", NULL);
684 if (input->parsebin == NULL)
686 input->parsebin = gst_object_ref (input->parsebin);
687 input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
688 input->pad_added_sigid =
689 g_signal_connect (input->parsebin, "pad-added",
690 (GCallback) parsebin_pad_added_cb, input);
691 input->pad_removed_sigid =
692 g_signal_connect (input->parsebin, "pad-removed",
693 (GCallback) parsebin_pad_removed_cb, input);
694 g_signal_connect (input->parsebin, "autoplug-continue",
695 (GCallback) parsebin_autoplug_continue_cb, dbin);
698 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
699 gst_bin_add (GST_BIN (dbin), input->parsebin);
703 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
704 input->parsebin_sink);
706 gst_element_sync_state_with_parent (input->parsebin);
713 gst_element_post_message ((GstElement *) dbin,
714 gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
719 static GstPadLinkReturn
720 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
722 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
723 GstPadLinkReturn res = GST_PAD_LINK_OK;
724 DecodebinInput *input;
726 GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
727 ". Creating parsebin if needed", pad);
729 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
733 if (!ensure_input_parsebin (dbin, input))
734 res = GST_PAD_LINK_REFUSED;
739 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
740 return GST_PAD_LINK_REFUSED;
744 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
746 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
747 DecodebinInput *input;
749 GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
750 ". Removing parsebin.", pad);
752 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
756 if (input->parsebin == NULL) {
761 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
762 gst_bin_remove (GST_BIN (dbin), input->parsebin);
763 gst_element_set_state (input->parsebin, GST_STATE_NULL);
769 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
774 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
776 GST_DEBUG ("Freeing input %p", input);
777 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
778 gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
779 if (input->parsebin) {
780 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
781 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
782 gst_element_set_state (input->parsebin, GST_STATE_NULL);
783 gst_object_unref (input->parsebin);
784 gst_object_unref (input->parsebin_sink);
789 /* Call with INPUT_LOCK taken */
790 static DecodebinInput *
791 create_new_input (GstDecodebin3 * dbin, gboolean main)
793 DecodebinInput *input;
795 input = g_new0 (DecodebinInput, 1);
797 input->is_main = main;
798 input->group_id = G_MAXUINT32;
800 input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
802 gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
803 input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
806 g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
807 gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
808 gst_pad_set_unlink_function (input->ghost_sink,
809 gst_decodebin3_input_pad_unlink);
811 gst_pad_set_active (input->ghost_sink, TRUE);
812 gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
819 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
820 const gchar * name, const GstCaps * caps)
822 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
823 DecodebinInput *input;
826 /* We are ignoring names for the time being, not sure it makes any sense
827 * within the context of decodebin3 ... */
829 input = create_new_input (dbin, FALSE);
831 dbin->other_inputs = g_list_append (dbin->other_inputs, input);
832 res = input->ghost_sink;
839 /* Must be called with factories lock! */
841 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
845 cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
846 if (!dbin->factories || dbin->factories_cookie != cookie) {
849 gst_plugin_feature_list_free (dbin->factories);
850 if (dbin->decoder_factories)
851 g_list_free (dbin->decoder_factories);
852 if (dbin->decodable_factories)
853 g_list_free (dbin->decodable_factories);
855 gst_element_factory_list_get_elements
856 (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
858 g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
859 dbin->factories_cookie = cookie;
861 /* Filter decoder and other decodables */
862 dbin->decoder_factories = NULL;
863 dbin->decodable_factories = NULL;
864 for (tmp = dbin->factories; tmp; tmp = tmp->next) {
865 GstElementFactory *fact = (GstElementFactory *) tmp->data;
866 if (gst_element_factory_list_is_type (fact,
867 GST_ELEMENT_FACTORY_TYPE_DECODER))
868 dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
870 dbin->decodable_factories =
871 g_list_append (dbin->decodable_factories, fact);
876 /* Must be called with appropriate lock if list is a protected variable */
878 stream_in_list (GList * list, const gchar * sid)
883 for (tmp = list; tmp; tmp = tmp->next) {
884 gchar *osid = (gchar *) tmp->data;
885 GST_DEBUG ("Checking %s against %s", sid, osid);
889 for (tmp = list; tmp; tmp = tmp->next) {
890 gchar *osid = (gchar *) tmp->data;
891 if (!g_strcmp0 (sid, osid))
899 update_requested_selection (GstDecodebin3 * dbin,
900 GstStreamCollection * collection)
904 GstStreamType used_types = 0;
906 nb = gst_stream_collection_get_size (collection);
908 /* 1. Is there a pending SELECT_STREAMS we can return straight away since
909 * the switch handler will take care of the pending selection */
910 SELECTION_LOCK (dbin);
911 if (dbin->pending_select_streams) {
912 GST_DEBUG_OBJECT (dbin,
913 "No need to create pending selection, SELECT_STREAMS underway");
917 /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
918 GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
920 /* 3. If not, check if we already have some of the streams in the
921 * existing active/requested selection */
922 for (i = 0; i < nb; i++) {
923 GstStream *stream = gst_stream_collection_get_stream (collection, i);
924 const gchar *sid = gst_stream_get_stream_id (stream);
926 /* Fire select-stream signal to see if outside components want to
927 * hint at which streams should be selected */
928 g_signal_emit (G_OBJECT (dbin),
929 gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
931 GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
932 if (request == 1 || (request == -1
933 && (stream_in_list (dbin->requested_selection, sid)
934 || stream_in_list (dbin->active_selection, sid)))) {
935 GstStreamType curtype = gst_stream_get_stream_type (stream);
937 GST_DEBUG_OBJECT (dbin,
938 "Using stream requested by 'select-stream' signal : %s", sid);
940 GST_DEBUG_OBJECT (dbin,
941 "Re-using stream already present in requested or active selection : %s",
943 tmp = g_list_append (tmp, (gchar *) sid);
944 used_types |= curtype;
948 /* 4. If not, match one stream of each type */
949 for (i = 0; i < nb; i++) {
950 GstStream *stream = gst_stream_collection_get_stream (collection, i);
951 GstStreamType curtype = gst_stream_get_stream_type (stream);
952 if (!(used_types & curtype)) {
953 const gchar *sid = gst_stream_get_stream_id (stream);
954 GST_DEBUG_OBJECT (dbin, "Selecting stream '%s' of type %s",
955 sid, gst_stream_type_get_name (curtype));
956 tmp = g_list_append (tmp, (gchar *) sid);
957 used_types |= curtype;
962 /* Finally set the requested selection */
964 if (dbin->requested_selection) {
965 GST_FIXME_OBJECT (dbin,
966 "Replacing non-NULL requested_selection, what should we do ??");
967 g_list_free (dbin->requested_selection);
969 dbin->requested_selection = tmp;
970 dbin->selection_updated = TRUE;
972 SELECTION_UNLOCK (dbin);
975 /* Call with INPUT_LOCK taken */
976 static GstStreamCollection *
977 get_merged_collection (GstDecodebin3 * dbin)
979 gboolean needs_merge = FALSE;
980 GstStreamCollection *res = NULL;
984 /* First check if we need to do a merge or just return the only collection */
985 res = dbin->main_input->collection;
987 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
988 DecodebinInput *input = (DecodebinInput *) tmp->data;
989 if (input->collection) {
994 res = input->collection;
999 GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1003 /* We really need to create a new collection */
1004 /* FIXME : Some numbering scheme maybe ?? */
1005 res = gst_stream_collection_new ("decodebin3");
1006 if (dbin->main_input->collection) {
1007 nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1008 GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1009 for (i = 0; i < nb_stream; i++) {
1011 gst_stream_collection_get_stream (dbin->main_input->collection, i);
1012 gst_stream_collection_add_stream (res, stream);
1016 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1017 DecodebinInput *input = (DecodebinInput *) tmp->data;
1018 GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1020 if (input->collection) {
1021 nb_stream = gst_stream_collection_get_size (input->collection);
1022 GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1023 for (i = 0; i < nb_stream; i++) {
1025 gst_stream_collection_get_stream (input->collection, i);
1026 gst_stream_collection_add_stream (res, stream);
1034 /* Call with INPUT_LOCK taken */
1035 static DecodebinInput *
1036 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1038 DecodebinInput *input = NULL;
1039 GstElement *parent = gst_object_ref (child);
1043 GstElement *next_parent;
1045 GST_DEBUG_OBJECT (dbin, "parent %s",
1046 parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1048 if (parent == dbin->main_input->parsebin) {
1049 input = dbin->main_input;
1052 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1053 DecodebinInput *cur = (DecodebinInput *) tmp->data;
1054 if (parent == cur->parsebin) {
1059 next_parent = (GstElement *) gst_element_get_parent (parent);
1060 gst_object_unref (parent);
1061 parent = next_parent;
1063 } while (parent && parent != (GstElement *) dbin);
1066 gst_object_unref (parent);
1072 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1076 if (dbin->collection == NULL)
1078 len = gst_stream_collection_get_size (dbin->collection);
1079 for (i = 0; i < len; i++) {
1080 GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1081 const gchar *osid = gst_stream_get_stream_id (stream);
1082 if (!g_strcmp0 (sid, osid))
1089 /* Call with INPUT_LOCK taken */
1091 handle_stream_collection (GstDecodebin3 * dbin,
1092 GstStreamCollection * collection, GstElement * child)
1094 #ifndef GST_DISABLE_GST_DEBUG
1095 const gchar *upstream_id;
1098 DecodebinInput *input = find_message_parsebin (dbin, child);
1101 GST_DEBUG_OBJECT (dbin,
1102 "Couldn't find corresponding input, most likely shutting down");
1106 /* Replace collection in input */
1107 if (input->collection)
1108 gst_object_unref (input->collection);
1109 input->collection = collection;
1110 GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1113 /* Merge collection if needed */
1114 collection = get_merged_collection (dbin);
1116 #ifndef GST_DISABLE_GST_DEBUG
1117 /* Just some debugging */
1118 upstream_id = gst_stream_collection_get_upstream_id (collection);
1119 GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1120 GST_DEBUG ("From input %p", input);
1121 GST_DEBUG (" %d streams", gst_stream_collection_get_size (collection));
1122 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1123 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1124 const GstTagList *taglist;
1125 const GstCaps *caps;
1127 GST_DEBUG (" Stream '%s'", gst_stream_get_stream_id (stream));
1128 GST_DEBUG (" type : %s",
1129 gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1130 GST_DEBUG (" flags : 0x%x", gst_stream_get_stream_flags (stream));
1131 taglist = gst_stream_get_tags (stream);
1132 GST_DEBUG (" tags : %" GST_PTR_FORMAT, taglist);
1133 caps = gst_stream_get_caps (stream);
1134 GST_DEBUG (" caps : %" GST_PTR_FORMAT, caps);
1138 /* Store collection for later usage */
1139 if (dbin->collection == NULL) {
1140 dbin->collection = collection;
1142 /* We need to check who emitted this collection (the owner).
1143 * If we already had a collection from that user, this one is an update,
1144 * that is to say that we need to figure out how we are going to re-use
1145 * the streams/slot */
1146 GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1147 /* FIXME : When do we switch from pending collection to active collection ?
1148 * When all streams from active collection are drained in multiqueue output ? */
1149 gst_object_unref (dbin->collection);
1150 dbin->collection = collection;
1151 /* dbin->pending_collection = */
1152 /* g_list_append (dbin->pending_collection, collection); */
1157 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1159 GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1160 gboolean posting_collection = FALSE;
1162 GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1164 switch (GST_MESSAGE_TYPE (message)) {
1165 case GST_MESSAGE_STREAM_COLLECTION:
1167 GstStreamCollection *collection = NULL;
1168 gst_message_parse_stream_collection (message, &collection);
1171 handle_stream_collection (dbin, collection,
1172 (GstElement *) GST_MESSAGE_SRC (message));
1173 posting_collection = TRUE;
1174 INPUT_UNLOCK (dbin);
1176 if (dbin->collection && collection != dbin->collection) {
1177 /* Replace collection message, we most likely aggregated it */
1178 GstMessage *new_msg;
1180 gst_message_new_stream_collection ((GstObject *) dbin,
1182 gst_message_unref (message);
1186 gst_object_unref (collection);
1193 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1195 if (posting_collection) {
1196 /* Figure out a selection for that collection */
1197 update_requested_selection (dbin, dbin->collection);
1201 static DecodebinOutputStream *
1202 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1205 GstStreamType stype = gst_stream_get_stream_type (stream);
1207 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1208 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1209 if (output->type == stype && output->slot && output->slot->active_stream) {
1210 GstStream *tstream = output->slot->active_stream;
1211 if (!stream_in_list (dbin->requested_selection,
1212 (gchar *) gst_stream_get_stream_id (tstream))) {
1221 /* Give a certain slot, figure out if it should be linked to an
1223 * CALL WITH SELECTION LOCK TAKEN !*/
1224 static DecodebinOutputStream *
1225 get_output_for_slot (MultiQueueSlot * slot)
1227 GstDecodebin3 *dbin = slot->dbin;
1228 DecodebinOutputStream *output = NULL;
1229 const gchar *stream_id;
1230 const GstCaps *caps;
1232 /* If we already have a configured output, just use it */
1233 if (slot->output != NULL)
1234 return slot->output;
1239 * This method needs to be split into multiple parts
1241 * 1) Figure out whether stream should be exposed or not
1242 * This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1243 * in the default stream attribution
1245 * 2) Figure out whether an output stream should be created, whether
1246 * we can re-use the output stream already linked to the slot, or
1247 * whether we need to get re-assigned another (currently used) output
1251 stream_id = gst_stream_get_stream_id (slot->active_stream);
1252 caps = gst_stream_get_caps (slot->active_stream);
1253 GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1255 /* 0. Emit autoplug-continue signal for pending caps ? */
1256 GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1258 /* 1. if in EXPOSE_ALL_MODE, just accept */
1259 GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1262 /* FIXME : The idea around this was to avoid activating a stream for
1263 * which we have no decoder. Unfortunately it is way too
1264 * expensive. Need to figure out a better solution */
1265 /* 2. Is there a potential decoder (if one is required) */
1266 if (!gst_caps_can_intersect (caps, dbin->caps)
1267 && !have_factory (dbin, (GstCaps *) caps,
1268 GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1269 GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1271 SELECTION_UNLOCK (dbin);
1272 gst_element_post_message (GST_ELEMENT_CAST (dbin),
1273 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1274 SELECTION_LOCK (dbin);
1279 /* 3. In default mode check if we should expose */
1280 if (stream_in_list (dbin->requested_selection, stream_id)) {
1281 /* Check if we can steal an existing output stream we could re-use.
1283 * * an output stream whose slot->stream is not in requested
1284 * * and is of the same type as this stream
1286 output = find_free_compatible_output (dbin, slot->active_stream);
1288 /* Move this output from its current slot to this slot */
1290 g_list_append (dbin->to_activate, (gchar *) stream_id);
1291 dbin->requested_selection =
1292 g_list_remove (dbin->requested_selection, stream_id);
1293 SELECTION_UNLOCK (dbin);
1294 gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1295 (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1296 SELECTION_LOCK (dbin);
1300 output = create_output_stream (dbin, slot->type);
1301 output->slot = slot;
1302 GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1303 slot->output = output;
1304 dbin->active_selection =
1305 g_list_append (dbin->active_selection, (gchar *) stream_id);
1307 GST_DEBUG ("Not creating any output for slot %p", slot);
1312 /* Returns SELECTED_STREAMS message if active_selection is equal to
1313 * requested_selection, else NULL.
1314 * Must be called with LOCK taken */
1316 is_selection_done (GstDecodebin3 * dbin)
1321 if (!dbin->selection_updated)
1324 GST_LOG_OBJECT (dbin, "Checking");
1326 if (dbin->to_activate != NULL) {
1327 GST_DEBUG ("Still have streams to activate");
1330 for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1331 GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1332 if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1333 GST_DEBUG ("Not in active selection, returning");
1338 GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1340 /* We are completely active */
1341 msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1342 GST_MESSAGE_SEQNUM (msg) = dbin->select_streams_seqnum;
1343 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1344 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1346 GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1347 gst_stream_get_stream_id (output->slot->active_stream));
1349 gst_message_streams_selected_add (msg, output->slot->active_stream);
1351 GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1353 dbin->selection_updated = FALSE;
1357 static GstPadProbeReturn
1358 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1359 MultiQueueSlot * slot)
1361 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1362 GstDecodebin3 *dbin = slot->dbin;
1364 if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1365 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1367 GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1368 switch (GST_EVENT_TYPE (ev)) {
1369 case GST_EVENT_STREAM_START:
1371 GstStream *stream = NULL;
1372 const gchar *stream_id;
1374 gst_event_parse_stream (ev, &stream);
1375 if (stream == NULL) {
1376 GST_ERROR_OBJECT (pad,
1377 "Got a STREAM_START event without a GstStream");
1380 stream_id = gst_stream_get_stream_id (stream);
1381 GST_DEBUG_OBJECT (pad, "Stream Start '%s'", stream_id);
1382 if (slot->active_stream == NULL) {
1383 slot->active_stream = stream;
1384 } else if (slot->active_stream != stream) {
1385 GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1386 gst_stream_get_stream_id (slot->active_stream),
1387 gst_stream_get_stream_id (stream));
1388 gst_object_unref (slot->active_stream);
1389 slot->active_stream = stream;
1391 gst_object_unref (stream);
1392 #if 0 /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1394 gboolean is_active, is_requested;
1395 /* Quick check to see if we're in the current selection */
1396 /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
1397 SELECTION_LOCK (dbin);
1398 GST_DEBUG_OBJECT (dbin, "Checking active selection");
1399 is_active = stream_in_list (dbin->active_selection, stream_id);
1400 GST_DEBUG_OBJECT (dbin, "Checking requested selection");
1401 is_requested = stream_in_list (dbin->requested_selection, stream_id);
1402 SELECTION_UNLOCK (dbin);
1404 GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
1407 GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
1409 else if (slot->output) {
1410 GST_DEBUG_OBJECT (pad,
1411 "Slot needs to be deactivated ? It's no longer in requested selection");
1412 } else if (!is_active)
1413 GST_DEBUG_OBJECT (pad,
1414 "Slot in neither active nor requested selection");
1419 case GST_EVENT_CAPS:
1421 /* Configure the output slot if needed */
1422 DecodebinOutputStream *output;
1423 GstMessage *msg = NULL;
1424 SELECTION_LOCK (dbin);
1425 output = get_output_for_slot (slot);
1427 reconfigure_output_stream (output, slot);
1428 msg = is_selection_done (dbin);
1430 SELECTION_UNLOCK (dbin);
1432 gst_element_post_message ((GstElement *) slot->dbin, msg);
1436 /* FIXME : Figure out */
1437 GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
1439 if (slot->input == NULL) {
1441 GST_DEBUG_OBJECT (pad,
1442 "last EOS for input, forwarding and removing slot");
1443 peer = gst_pad_get_peer (pad);
1445 gst_pad_send_event (peer, ev);
1446 gst_object_unref (peer);
1448 SELECTION_LOCK (dbin);
1449 /* FIXME : Shouldn't we try to re-assign the output instead of just
1451 /* Remove the output */
1453 DecodebinOutputStream *output = slot->output;
1454 dbin->output_streams = g_list_remove (dbin->output_streams, output);
1455 free_output_stream (dbin, output);
1457 SELECTION_UNLOCK (dbin);
1458 ret = GST_PAD_PROBE_HANDLED;
1464 } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
1465 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
1466 switch (GST_QUERY_TYPE (query)) {
1467 case GST_QUERY_CAPS:
1469 GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
1470 gst_query_set_caps_result (query, gst_caps_new_any ());
1471 ret = GST_PAD_PROBE_HANDLED;
1475 case GST_QUERY_ACCEPT_CAPS:
1477 GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
1478 /* If the current decoder doesn't accept caps, we'll reconfigure
1479 * on the actual caps event. So accept any caps. */
1480 gst_query_set_accept_caps_result (query, TRUE);
1481 ret = GST_PAD_PROBE_HANDLED;
1491 /* Create a new multiqueue slot for the given type
1493 * It is up to the caller to know whether that slot is needed or not
1494 * (and release it when no longer needed) */
1495 static MultiQueueSlot *
1496 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
1498 MultiQueueSlot *slot;
1499 GstIterator *it = NULL;
1500 GValue item = { 0, };
1502 GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
1503 gst_stream_type_get_name (type));
1504 slot = g_new0 (MultiQueueSlot, 1);
1506 slot->id = dbin->slot_id++;
1508 slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
1509 if (slot->sink_pad == NULL)
1511 it = gst_pad_iterate_internal_links (slot->sink_pad);
1512 if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
1513 || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
1514 GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
1515 GST_DEBUG_PAD_NAME (slot->src_pad));
1518 gst_iterator_free (it);
1519 g_value_reset (&item);
1521 g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
1523 /* Add event probe */
1525 gst_pad_add_probe (slot->src_pad,
1526 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
1527 (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
1529 GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
1530 GST_DEBUG_PAD_NAME (slot->src_pad));
1531 dbin->slots = g_list_append (dbin->slots, slot);
1538 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
1544 static MultiQueueSlot *
1545 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
1548 MultiQueueSlot *empty_slot = NULL;
1549 GstStreamType input_type = 0;
1550 gchar *stream_id = NULL;
1552 GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
1553 input, input->active_stream,
1555 active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
1557 if (input->active_stream) {
1558 input_type = gst_stream_get_stream_type (input->active_stream);
1559 stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
1562 /* Go over existing slots and check if there is already one for it */
1563 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1564 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1565 /* Already used input, return that one */
1566 if (slot->input == input) {
1567 GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
1572 /* Go amongst all unused slots of the right type and try to find a candidate */
1573 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1574 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1575 if (slot->input == NULL && input_type == slot->type) {
1576 /* Remember this empty slot for later */
1578 /* Check if available slot is of the same stream_id */
1579 GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
1580 slot->id, slot->active_stream);
1581 if (stream_id && slot->active_stream) {
1583 (gchar *) gst_stream_get_stream_id (slot->active_stream);
1584 GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
1585 ostream_id, stream_id);
1586 if (!g_strcmp0 (stream_id, ostream_id))
1593 GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
1594 empty_slot->input = input;
1599 return create_new_slot (dbin, input_type);
1605 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
1608 if (slot->input != NULL && slot->input != input) {
1609 GST_ERROR_OBJECT (slot->dbin,
1610 "Trying to link input to an already used slot");
1613 gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
1614 slot->pending_stream = input->active_stream;
1615 slot->input = input;
1616 event = gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1618 gst_pad_send_event (slot->sink_pad, event);
1623 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
1624 GstElementFactoryListType ftype)
1626 gboolean ret = FALSE;
1629 g_mutex_lock (&dbin->factories_lock);
1630 gst_decode_bin_update_factories_list (dbin);
1631 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1633 gst_element_factory_list_filter (dbin->decoder_factories,
1634 caps, GST_PAD_SINK, TRUE);
1637 gst_element_factory_list_filter (dbin->decodable_factories,
1638 caps, GST_PAD_SINK, TRUE);
1639 g_mutex_unlock (&dbin->factories_lock);
1643 gst_plugin_feature_list_free (res);
1651 create_element (GstDecodebin3 * dbin, GstStream * stream,
1652 GstElementFactoryListType ftype)
1655 GstElement *element = NULL;
1657 g_mutex_lock (&dbin->factories_lock);
1658 gst_decode_bin_update_factories_list (dbin);
1659 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1661 gst_element_factory_list_filter (dbin->decoder_factories,
1662 gst_stream_get_caps (stream), GST_PAD_SINK, TRUE);
1665 gst_element_factory_list_filter (dbin->decodable_factories,
1666 gst_stream_get_caps (stream), GST_PAD_SINK, TRUE);
1667 g_mutex_unlock (&dbin->factories_lock);
1671 gst_element_factory_create ((GstElementFactory *) res->data, NULL);
1672 GST_DEBUG ("Created element '%s'", GST_ELEMENT_NAME (element));
1673 gst_plugin_feature_list_free (res);
1675 GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
1676 gst_stream_get_caps (stream));
1682 /* FIXME : VERY NAIVE. ASSUMING FIRST ONE WILL WORK */
1684 create_decoder (GstDecodebin3 * dbin, GstStream * stream)
1686 return create_element (dbin, stream, GST_ELEMENT_FACTORY_TYPE_DECODER);
1689 static GstPadProbeReturn
1690 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
1691 DecodebinOutputStream * output)
1693 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
1694 /* If we have a keyframe, remove the probe and let all data through */
1695 /* FIXME : HANDLE HEADER BUFFER ?? */
1696 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1697 GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
1698 GST_DEBUG_OBJECT (pad,
1699 "Buffer is keyframe or header, letting through and removing probe");
1700 output->drop_probe_id = 0;
1701 return GST_PAD_PROBE_REMOVE;
1703 GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
1704 return GST_PAD_PROBE_DROP;
1708 reconfigure_output_stream (DecodebinOutputStream * output,
1709 MultiQueueSlot * slot)
1711 GstDecodebin3 *dbin = output->dbin;
1712 GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
1713 gboolean needs_decoder;
1715 needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
1717 GST_DEBUG_OBJECT (dbin,
1718 "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
1721 /* FIXME : Maybe make the output un-hook itself automatically ? */
1722 if (output->slot != NULL && output->slot != slot) {
1723 GST_WARNING_OBJECT (dbin,
1724 "Output still linked to another slot (%p)", output->slot);
1728 /* Check if existing config is reusable as-is by checking if
1729 * the existing decoder accepts the new caps, if not delete
1730 * it and create a new one */
1731 if (output->decoder) {
1732 gboolean can_reuse_decoder;
1734 if (needs_decoder) {
1735 GstQuery *q = gst_query_new_accept_caps (new_caps);
1736 can_reuse_decoder = gst_pad_query (output->decoder_sink, q);
1738 can_reuse_decoder = FALSE;
1740 if (can_reuse_decoder) {
1741 if (output->type == GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
1742 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
1743 output->drop_probe_id =
1744 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
1745 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
1747 GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
1748 if (output->linked == FALSE) {
1749 gst_pad_link_full (slot->src_pad, output->decoder_sink,
1750 GST_PAD_LINK_CHECK_NOTHING);
1751 output->linked = TRUE;
1756 GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
1759 gst_pad_unlink (slot->src_pad, output->decoder_sink);
1760 output->linked = FALSE;
1761 if (output->drop_probe_id) {
1762 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
1763 output->drop_probe_id = 0;
1766 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
1767 GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
1771 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
1772 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
1774 gst_element_set_locked_state (output->decoder, TRUE);
1775 gst_element_set_state (output->decoder, GST_STATE_NULL);
1777 gst_bin_remove ((GstBin *) dbin, output->decoder);
1778 output->decoder = NULL;
1781 /* If a decoder is required, create one */
1782 if (needs_decoder) {
1783 /* If we don't have a decoder yet, instantiate one */
1784 output->decoder = create_decoder (dbin, slot->active_stream);
1785 if (output->decoder == NULL) {
1786 SELECTION_UNLOCK (dbin);
1787 /* FIXME : Should we be smarter if there's a missing decoder ?
1788 * Should we deactivate that stream ? */
1789 gst_element_post_message (GST_ELEMENT_CAST (dbin),
1790 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin),
1791 gst_stream_get_caps (slot->active_stream)));
1792 SELECTION_LOCK (dbin);
1795 if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
1796 GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
1799 output->decoder_sink = gst_element_get_static_pad (output->decoder, "sink");
1800 output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
1801 if (output->type == GST_STREAM_TYPE_VIDEO) {
1802 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
1803 output->drop_probe_id =
1804 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
1805 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
1807 if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
1808 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
1809 GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
1810 GST_DEBUG_PAD_NAME (output->decoder_sink));
1814 output->decoder_src = gst_object_ref (slot->src_pad);
1815 output->decoder_sink = NULL;
1817 output->linked = TRUE;
1818 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
1819 output->decoder_src)) {
1820 GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
1823 if (output->src_exposed == FALSE) {
1824 output->src_exposed = TRUE;
1825 gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
1828 if (output->decoder)
1829 gst_element_sync_state_with_parent (output->decoder);
1831 output->slot = slot;
1836 GST_DEBUG_OBJECT (dbin, "Cleanup");
1837 if (output->decoder_sink) {
1838 gst_object_unref (output->decoder_sink);
1839 output->decoder_sink = NULL;
1841 if (output->decoder_src) {
1842 gst_object_unref (output->decoder_src);
1843 output->decoder_src = NULL;
1845 if (output->decoder) {
1846 gst_element_set_state (output->decoder, GST_STATE_NULL);
1847 gst_bin_remove ((GstBin *) dbin, output->decoder);
1848 output->decoder = NULL;
1853 static GstPadProbeReturn
1854 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
1856 GstMessage *msg = NULL;
1857 DecodebinOutputStream *output;
1859 SELECTION_LOCK (slot->dbin);
1860 output = get_output_for_slot (slot);
1862 GST_DEBUG_OBJECT (pad, "output : %p", output);
1865 reconfigure_output_stream (output, slot);
1866 msg = is_selection_done (slot->dbin);
1868 SELECTION_UNLOCK (slot->dbin);
1870 gst_element_post_message ((GstElement *) slot->dbin, msg);
1872 return GST_PAD_PROBE_REMOVE;
1875 static MultiQueueSlot *
1876 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
1880 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1881 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1882 const gchar *stream_id;
1883 if (slot->active_stream) {
1884 stream_id = gst_stream_get_stream_id (slot->active_stream);
1885 if (!g_strcmp0 (sid, stream_id))
1888 if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
1889 stream_id = gst_stream_get_stream_id (slot->pending_stream);
1890 if (!g_strcmp0 (sid, stream_id))
1898 /* This function handles the reassignment of a slot. Call this from
1899 * the streaming thread of a slot. */
1901 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
1903 DecodebinOutputStream *output;
1904 MultiQueueSlot *target_slot = NULL;
1906 const gchar *sid, *tsid;
1908 SELECTION_LOCK (dbin);
1909 output = slot->output;
1911 if (G_UNLIKELY (slot->active_stream == NULL)) {
1912 GST_DEBUG_OBJECT (slot->src_pad,
1913 "Called on inactive slot (active_stream == NULL)");
1914 SELECTION_UNLOCK (dbin);
1918 if (G_UNLIKELY (output == NULL)) {
1919 GST_DEBUG_OBJECT (slot->src_pad,
1920 "Slot doesn't have any output to be removed");
1921 SELECTION_UNLOCK (dbin);
1925 sid = gst_stream_get_stream_id (slot->active_stream);
1926 GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
1928 /* Recheck whether this stream is still in the list of streams to deactivate */
1929 if (stream_in_list (dbin->requested_selection, sid)) {
1930 /* Stream is in the list of requested streams, don't remove */
1931 SELECTION_UNLOCK (dbin);
1932 GST_DEBUG_OBJECT (slot->src_pad,
1933 "Stream '%s' doesn't need to be deactivated", sid);
1937 /* Unlink slot from output */
1938 /* FIXME : Handle flushing ? */
1939 /* FIXME : Handle outputs without decoders */
1940 GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
1941 output->decoder_sink);
1942 if (output->decoder_sink)
1943 gst_pad_unlink (slot->src_pad, output->decoder_sink);
1944 output->linked = FALSE;
1945 slot->output = NULL;
1946 output->slot = NULL;
1947 /* Remove sid from active selection */
1948 for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
1949 if (!g_strcmp0 (sid, tmp->data)) {
1950 dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
1954 /* Can we re-assign this output to a requested stream ? */
1955 GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
1956 for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
1957 MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
1958 GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
1959 tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
1960 if (tslot && tslot->type == output->type && tslot->output == NULL) {
1961 GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
1962 target_slot = tslot;
1964 /* Pass target stream id to requested selection */
1965 dbin->requested_selection =
1966 g_list_append (dbin->requested_selection, tmp->data);
1967 dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data);
1973 GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
1975 target_slot->output = output;
1976 output->slot = target_slot;
1977 dbin->active_selection =
1978 g_list_append (dbin->active_selection, (gchar *) tsid);
1979 SELECTION_UNLOCK (dbin);
1981 /* Wakeup the target slot so that it retries to send events/buffers
1982 * thereby triggering the output reconfiguration codepath */
1983 gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1984 (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
1985 /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
1987 SELECTION_UNLOCK (dbin);
1988 /* FIXME : Remove output if no longer needed ? The tricky part is knowing
1989 * if it's really no longer needed or not */
1990 GST_FIXME_OBJECT (slot->src_pad, "Remove unused output stream ?");
1996 /* Idle probe called when a slot should be unassigned from its output stream.
1997 * This is needed to ensure nothing is flowing when unlinking the slot.
1999 * Also, this method will search for a pending stream which could re-use
2000 * the output stream. */
2001 static GstPadProbeReturn
2002 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2003 MultiQueueSlot * slot)
2005 GstDecodebin3 *dbin = slot->dbin;
2007 reassign_slot (dbin, slot);
2009 return GST_PAD_PROBE_REMOVE;
2013 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2016 gboolean ret = TRUE;
2018 /* List of slots to (de)activate. */
2019 GList *to_deactivate = NULL;
2020 GList *to_activate = NULL;
2021 /* List of unknown stream id, most likely means the event
2022 * should be sent upstream so that elements can expose the requested stream */
2023 GList *unknown = NULL;
2024 GList *to_reassign = NULL;
2025 GList *future_request_streams = NULL;
2026 GList *pending_streams = NULL;
2027 GList *slots_to_reassign = NULL;
2029 SELECTION_LOCK (dbin);
2030 if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2031 GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2032 SELECTION_UNLOCK (dbin);
2035 /* Remove pending select_streams */
2036 g_list_free (dbin->pending_select_streams);
2037 dbin->pending_select_streams = NULL;
2039 /* COMPARE the requested streams to the active and requested streams
2042 /* First check the slots to activate and which ones are unknown */
2043 for (tmp = select_streams; tmp; tmp = tmp->next) {
2044 const gchar *sid = (const gchar *) tmp->data;
2045 MultiQueueSlot *slot;
2046 GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2047 slot = find_slot_for_stream_id (dbin, sid);
2048 /* Find the corresponding slot */
2050 if (stream_in_collection (dbin, (gchar *) sid)) {
2051 pending_streams = g_list_append (pending_streams, (gchar *) sid);
2053 GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2054 unknown = g_list_append (unknown, (gchar *) sid);
2056 } else if (slot->output == NULL) {
2057 GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2059 to_activate = g_list_append (to_activate, slot);
2061 GST_DEBUG_OBJECT (dbin,
2062 "Stream '%s' from slot %p is already active on output %p", sid, slot,
2064 future_request_streams =
2065 g_list_append (future_request_streams, (gchar *) sid);
2069 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2070 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2071 /* For slots that have an output, check if it's part of the streams to
2074 gboolean slot_to_deactivate = TRUE;
2076 if (slot->active_stream) {
2077 if (stream_in_list (select_streams,
2078 gst_stream_get_stream_id (slot->active_stream)))
2079 slot_to_deactivate = FALSE;
2081 if (slot_to_deactivate && slot->pending_stream
2082 && slot->pending_stream != slot->active_stream) {
2083 if (stream_in_list (select_streams,
2084 gst_stream_get_stream_id (slot->pending_stream)))
2085 slot_to_deactivate = FALSE;
2087 if (slot_to_deactivate) {
2088 GST_DEBUG_OBJECT (dbin,
2089 "Slot %p (%s) should be deactivated, no longer used", slot,
2090 gst_stream_get_stream_id (slot->active_stream));
2091 to_deactivate = g_list_append (to_deactivate, slot);
2096 if (to_deactivate != NULL) {
2097 GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2098 /* We need to compare what needs to be activated and deactivated in order
2099 * to determine whether there are outputs that can be transferred */
2100 /* Take the stream-id of the slots that are to be activated, for which there
2101 * is a slot of the same type that needs to be deactivated */
2102 tmp = to_deactivate;
2104 MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2105 gboolean removeit = FALSE;
2107 GST_DEBUG_OBJECT (dbin,
2108 "Checking if slot to deactivate (%p) has a candidate slot to activate",
2109 slot_to_deactivate);
2110 for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2111 MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2112 GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2113 if (slot_to_activate->type == slot_to_deactivate->type) {
2114 GST_DEBUG_OBJECT (dbin, "Re-using");
2115 to_reassign = g_list_append (to_reassign, (gchar *)
2116 gst_stream_get_stream_id (slot_to_activate->active_stream));
2118 g_list_append (slots_to_reassign, slot_to_deactivate);
2119 to_activate = g_list_remove (to_activate, slot_to_activate);
2126 to_deactivate = g_list_delete_link (to_deactivate, tmp);
2131 for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2132 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2133 GST_DEBUG_OBJECT (dbin,
2134 "Really need to deactivate slot %p, but no available alternative",
2138 /* The only slots left to activate are the ones that won't be reassigned and
2139 * therefore really need to have a new output created */
2140 for (tmp = to_activate; tmp; tmp = tmp->next) {
2141 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2142 if (slot->active_stream)
2143 future_request_streams =
2144 g_list_append (future_request_streams,
2145 (gchar *) gst_stream_get_stream_id (slot->active_stream));
2146 else if (slot->pending_stream)
2147 future_request_streams =
2148 g_list_append (future_request_streams,
2149 (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2151 GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2154 if (to_activate == NULL && pending_streams != NULL) {
2155 GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2156 if (dbin->requested_selection)
2157 g_list_free (dbin->requested_selection);
2158 dbin->requested_selection = select_streams;
2159 g_list_free (to_deactivate);
2160 g_list_free (pending_streams);
2161 to_deactivate = NULL;
2163 if (dbin->requested_selection)
2164 g_list_free (dbin->requested_selection);
2165 dbin->requested_selection = future_request_streams;
2166 dbin->requested_selection =
2167 g_list_concat (dbin->requested_selection, pending_streams);
2168 if (dbin->to_activate)
2169 g_list_free (dbin->to_activate);
2170 dbin->to_activate = to_reassign;
2173 dbin->selection_updated = TRUE;
2174 SELECTION_UNLOCK (dbin);
2177 GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2179 /* For all streams to deactivate, add an idle probe where we will do
2180 * the unassignment and switch over */
2181 for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2182 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2183 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2184 (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2190 static GstPadProbeReturn
2191 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2192 DecodebinOutputStream * output)
2194 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2195 GstDecodebin3 *dbin = output->dbin;
2196 GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2198 GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2200 switch (GST_EVENT_TYPE (event)) {
2201 case GST_EVENT_SELECT_STREAMS:
2204 GList *streams = NULL;
2205 guint32 seqnum = gst_event_get_seqnum (event);
2207 SELECTION_LOCK (dbin);
2208 if (seqnum == dbin->select_streams_seqnum) {
2209 SELECTION_UNLOCK (dbin);
2210 GST_DEBUG_OBJECT (pad,
2211 "Already handled/handling that SELECT_STREAMS event");
2214 dbin->select_streams_seqnum = seqnum;
2215 if (dbin->pending_select_streams != NULL) {
2216 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2217 g_list_free (dbin->pending_select_streams);
2218 dbin->pending_select_streams = NULL;
2220 gst_event_parse_select_streams (event, &streams);
2221 dbin->pending_select_streams = g_list_copy (streams);
2222 SELECTION_UNLOCK (dbin);
2224 /* Send event upstream */
2225 if ((peer = gst_pad_get_peer (pad))) {
2226 gst_pad_send_event (peer, event);
2227 gst_object_unref (peer);
2229 /* Finally handle the switch */
2231 handle_stream_switch (dbin, streams, seqnum);
2232 ret = GST_PAD_PROBE_HANDLED;
2243 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
2245 GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
2246 if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
2247 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2248 GList *streams = NULL;
2249 guint32 seqnum = gst_event_get_seqnum (event);
2251 SELECTION_LOCK (dbin);
2252 if (seqnum == dbin->select_streams_seqnum) {
2253 SELECTION_UNLOCK (dbin);
2254 GST_DEBUG_OBJECT (dbin,
2255 "Already handled/handling that SELECT_STREAMS event");
2258 dbin->select_streams_seqnum = seqnum;
2259 if (dbin->pending_select_streams != NULL) {
2260 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2261 g_list_free (dbin->pending_select_streams);
2262 dbin->pending_select_streams = NULL;
2264 gst_event_parse_select_streams (event, &streams);
2265 dbin->pending_select_streams = g_list_copy (streams);
2266 SELECTION_UNLOCK (dbin);
2268 /* FIXME : We don't have an upstream ?? */
2270 /* Send event upstream */
2271 if ((peer = gst_pad_get_peer (pad))) {
2272 gst_pad_send_event (peer, event);
2273 gst_object_unref (peer);
2276 /* Finally handle the switch */
2278 handle_stream_switch (dbin, streams, seqnum);
2282 return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
2287 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2290 gst_pad_remove_probe (slot->src_pad, slot->probe_id);
2292 if (slot->input->srcpad)
2293 gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
2296 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2297 gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
2298 gst_object_replace ((GstObject **) & slot->src_pad, NULL);
2302 /* Create a DecodebinOutputStream for a given type
2303 * Note: It will be empty initially, it needs to be configured
2305 static DecodebinOutputStream *
2306 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
2308 DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
2310 const gchar *prefix;
2311 GstStaticPadTemplate *templ;
2312 GstPadTemplate *ptmpl;
2314 GstPad *internal_pad;
2316 GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
2317 res, gst_stream_type_get_name (type));
2323 case GST_STREAM_TYPE_VIDEO:
2324 templ = &video_src_template;
2325 counter = &dbin->vpadcount;
2328 case GST_STREAM_TYPE_AUDIO:
2329 templ = &audio_src_template;
2330 counter = &dbin->apadcount;
2333 case GST_STREAM_TYPE_TEXT:
2334 templ = &text_src_template;
2335 counter = &dbin->tpadcount;
2339 templ = &src_template;
2340 counter = &dbin->opadcount;
2345 pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
2347 ptmpl = gst_static_pad_template_get (templ);
2348 res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
2349 gst_object_unref (ptmpl);
2351 gst_pad_set_active (res->src_pad, TRUE);
2352 /* Put an event probe on the internal proxy pad to detect upstream
2355 (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
2356 gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
2357 (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
2358 gst_object_unref (internal_pad);
2360 dbin->output_streams = g_list_append (dbin->output_streams, res);
2366 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
2369 if (output->decoder_sink && output->decoder)
2370 gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
2372 output->slot->output = NULL;
2373 output->slot = NULL;
2375 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2376 gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
2377 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2378 if (output->src_exposed) {
2379 gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
2381 if (output->decoder) {
2382 gst_element_set_locked_state (output->decoder, TRUE);
2383 gst_element_set_state (output->decoder, GST_STATE_NULL);
2384 gst_bin_remove ((GstBin *) dbin, output->decoder);
2389 static GstStateChangeReturn
2390 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
2392 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2393 GstStateChangeReturn ret;
2396 switch (transition) {
2400 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2401 if (ret == GST_STATE_CHANGE_FAILURE)
2404 switch (transition) {
2405 case GST_STATE_CHANGE_PAUSED_TO_READY:
2409 /* Free output streams */
2410 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2411 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2412 free_output_stream (dbin, output);
2414 g_list_free (dbin->output_streams);
2415 dbin->output_streams = NULL;
2416 /* Free multiqueue slots */
2417 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2418 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2419 free_multiqueue_slot (dbin, slot);
2421 g_list_free (dbin->slots);
2434 gst_decodebin3_plugin_init (GstPlugin * plugin)
2436 GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");
2438 return gst_element_register (plugin, "decodebin3", GST_RANK_NONE,
2439 GST_TYPE_DECODEBIN3);