decodebin3: allow to call "dispose" multiple times
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst / playback / gstdecodebin3.c
1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32
33 #include "gstplaybackelements.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36
37 /**
38  * SECTION:element-decodebin3
39  * @title: decodebin3
40  *
41  * #GstBin that auto-magically constructs a decoding pipeline using available
42  * decoders and demuxers via auto-plugging. The output is raw audio, video
43  * or subtitle streams.
44  *
45  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46  *
47  * * supports publication and selection of stream information via
48  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAMS events.
49  *
50  * * dynamically switches stream connections internally, and
51  * reuses decoder elements when stream selections change, so that in
52  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53  * and only creates new elements when streams change and an existing decoder
54  * is not capable of handling the new format.
55  *
56  * * supports multiple input pads for the parallel decoding of auxiliary streams
57  * not muxed with the primary stream.
58  *
59  * * does not handle network stream buffering. decodebin3 expects that network stream
60  * buffering is handled upstream, before data is passed to it.
61  *
62  * > decodebin3 is still experimental API and a technology preview.
63  * > Its behaviour and exposed API is subject to change.
64  *
65  */
66
67 /*
68  * Global design
69  *
70  * 1) From sink pad to elementary streams (GstParseBin)
71  *
72  * The input sink pads are fed to GstParseBin. GstParseBin will feed them
73  * through typefind. When the caps are detected (or changed) we recursively
74  * figure out which demuxer, parser or depayloader is needed until we get to
75  * elementary streams.
76  *
77  * All elementary streams (whether decoded or not, whether exposed or not) are
78  * fed through multiqueue. There is only *one* multiqueue in decodebin3.
79  *
80  * => MultiQueue is the cornerstone.
81  * => No buffering before multiqueue
82  *
83  * 2) Elementary streams
84  *
85  * After GstParseBin, there are 3 main components:
86  *  1) Input Streams (provided by GstParseBin)
87  *  2) Multiqueue slots
88  *  3) Output Streams
89  *
90  * Input Streams correspond to the stream coming from GstParseBin and that gets
91  * fed into a multiqueue slot.
92  *
93  * Output Streams correspond to the combination of a (optional) decoder and an
94  * output ghostpad. Output Streams can be moved from one multiqueue slot to
95  * another, can reconfigure itself (different decoders), and can be
96  * added/removed depending on the configuration (all streams outputted, only one
97  * of each type, ...).
98  *
99  * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
100  * each 'active' Input Stream there is a corresponding slot.
101  * Slots might have different streams on input and output (due to internal
102  * buffering).
103  *
104  * Due to internal queuing/buffering/..., all those components (might) behave
105  * asynchronously. Therefore probes will be used on each component source pad to
106  * detect various key-points:
107  *  * EOS :
108  *     the stream is done => Mark that component as done, optionally freeing/removing it
109  *  * STREAM_START :
110  *     a new stream is starting => link it further if needed
111  *
112  * 3) Gradual replacement
113  *
114  * If the caps change at any point in decodebin (input sink pad, demuxer output,
115  * multiqueue output, ..), we gradually replace (if needed) the following elements.
116  *
117  * This is handled by the probes in various locations:
118  *  a) typefind output
119  *  b) multiqueue input (source pad of Input Streams)
120  *  c) multiqueue output (source pad of Multiqueue Slots)
121  *  d) final output (target of source ghostpads)
122  *
123  * When CAPS event arrive at those points, one of three things can happen:
124  * a) There is no elements downstream yet, just create/link-to following elements
125  * b) There are downstream elements, do a ACCEPT_CAPS query
126  *  b.1) The new CAPS are accepted, keep current configuration
127  *  b.2) The new CAPS are not accepted, remove following elements then do a)
128  *
129  *    Components:
130  *
131  *                                                   MultiQ     Output
132  *                     Input(s)                      Slots      Streams
133  *  /-------------------------------------------\   /-----\  /------------- \
134  *
135  * +-------------------------------------------------------------------------+
136  * |                                                                         |
137  * | +---------------------------------------------+                         |
138  * | |   GstParseBin(s)                            |                         |
139  * | |                +--------------+             |  +-----+                |
140  * | |                |              |---[parser]-[|--| Mul |---[ decoder ]-[|
141  * |]--[ typefind ]---|  demuxer(s)  |------------[|  | ti  |                |
142  * | |                |  (if needed) |---[parser]-[|--| qu  |                |
143  * | |                |              |---[parser]-[|--| eu  |---[ decoder ]-[|
144  * | |                +--------------+             |  +------             ^  |
145  * | +---------------------------------------------+        ^             |  |
146  * |                                               ^        |             |  |
147  * +-----------------------------------------------+--------+-------------+--+
148  *                                                 |        |             |
149  *                                                 |        |             |
150  *                                       Probes  --/--------/-------------/
151  *
152  * ATOMIC SWITCHING
153  *
154  * We want to ensure we re-use decoders when switching streams. This takes place
155  * at the multiqueue output level.
156  *
157  * MAIN CONCEPTS
158  *  1) Activating a stream (i.e. linking a slot to an output) is only done within
159  *    the streaming thread in the multiqueue_src_probe() and only if the
160       stream is in the REQUESTED selection.
161  *  2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
162  *    within the stream thread, but only in a purposefully called IDLE probe
163  *    that calls reassign_slot().
164  *
165  * Based on those two principles, 3 "selection" of streams (stream-id) are used:
166  * 1) requested_selection
167  *    All streams within that list should be activated
168  * 2) active_selection
169  *    List of streams that are exposed by decodebin
170  * 3) to_activate
171  *    List of streams that will be moved to requested_selection in the
172  *    reassign_slot() method (i.e. once a stream was deactivated, and the output
173  *    was retargetted)
174  */
175
176
177 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
178 #define GST_CAT_DEFAULT decodebin3_debug
179
180 #define GST_TYPE_DECODEBIN3      (gst_decodebin3_get_type ())
181
182 #define EXTRA_DEBUG 1
183
184 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
185 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
186 static GQuark
187 _custom_final_eos_quark_get (void)
188 {
189   static gsize g_quark;
190
191   if (g_once_init_enter (&g_quark)) {
192     gsize quark =
193         (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
194     g_once_init_leave (&g_quark, quark);
195   }
196   return g_quark;
197 }
198
199 typedef struct _GstDecodebin3 GstDecodebin3;
200 typedef struct _GstDecodebin3Class GstDecodebin3Class;
201
202 typedef struct _DecodebinInputStream DecodebinInputStream;
203 typedef struct _DecodebinInput DecodebinInput;
204 typedef struct _DecodebinOutputStream DecodebinOutputStream;
205
206 struct _GstDecodebin3
207 {
208   GstBin bin;
209
210   /* input_lock protects the following variables */
211   GMutex input_lock;
212   /* Main input (static sink pad) */
213   DecodebinInput *main_input;
214   /* Supplementary input (request sink pads) */
215   GList *other_inputs;
216   /* counter for input */
217   guint32 input_counter;
218   /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
219   /* FIXME : Needs to be reset appropriately (when upstream changes ?) */
220   guint32 current_group_id;
221   /* End of variables protected by input_lock */
222
223   GstElement *multiqueue;
224   GstClockTime default_mq_min_interleave;
225   GstClockTime current_mq_min_interleave;
226
227   /* selection_lock protects access to following variables */
228   GMutex selection_lock;
229   GList *input_streams;         /* List of DecodebinInputStream for active collection */
230   GList *output_streams;        /* List of DecodebinOutputStream used for output */
231   GList *slots;                 /* List of MultiQueueSlot */
232   guint slot_id;
233
234   /* Active collection */
235   GstStreamCollection *collection;
236   /* requested selection of stream-id to activate post-multiqueue */
237   GList *requested_selection;
238   /* list of stream-id currently activated in output */
239   GList *active_selection;
240   /* List of stream-id that need to be activated (after a stream switch for ex) */
241   GList *to_activate;
242   /* Pending select streams event */
243   guint32 select_streams_seqnum;
244   /* pending list of streams to select (from downstream) */
245   GList *pending_select_streams;
246   /* TRUE if requested_selection was updated, will become FALSE once
247    * it has fully transitioned to active */
248   gboolean selection_updated;
249   /* End of variables protected by selection_lock */
250   gboolean upstream_selected;
251
252   /* List of pending collections.
253    * FIXME : Is this really needed ? */
254   GList *pending_collection;
255
256   /* Factories */
257   GMutex factories_lock;
258   guint32 factories_cookie;
259   /* All DECODABLE factories */
260   GList *factories;
261   /* Only DECODER factories */
262   GList *decoder_factories;
263   /* DECODABLE but not DECODER factories */
264   GList *decodable_factories;
265
266   /* counters for pads */
267   guint32 apadcount, vpadcount, tpadcount, opadcount;
268
269   /* Properties */
270   GstCaps *caps;
271 };
272
273 struct _GstDecodebin3Class
274 {
275   GstBinClass class;
276
277     gint (*select_stream) (GstDecodebin3 * dbin,
278       GstStreamCollection * collection, GstStream * stream);
279 };
280
281 /* Input of decodebin, controls input pad and parsebin */
282 struct _DecodebinInput
283 {
284   GstDecodebin3 *dbin;
285
286   gboolean is_main;
287
288   GstPad *ghost_sink;
289   GstPad *parsebin_sink;
290
291   GstStreamCollection *collection;      /* Active collection */
292   gboolean upstream_selected;
293
294   guint group_id;
295
296   GstElement *parsebin;
297
298   gulong pad_added_sigid;
299   gulong pad_removed_sigid;
300   gulong drained_sigid;
301
302   /* TRUE if the input got drained
303    * FIXME : When do we reset it if re-used ?
304    */
305   gboolean drained;
306
307   /* HACK : Remove these fields */
308   /* List of PendingPad structures */
309   GList *pending_pads;
310 };
311
312 /* Multiqueue Slots */
313 typedef struct _MultiQueueSlot
314 {
315   guint id;
316
317   GstDecodebin3 *dbin;
318   /* Type of stream handled by this slot */
319   GstStreamType type;
320
321   /* Linked input and output */
322   DecodebinInputStream *input;
323
324   /* pending => last stream received on sink pad */
325   GstStream *pending_stream;
326   /* active => last stream outputted on source pad */
327   GstStream *active_stream;
328
329   GstPad *sink_pad, *src_pad;
330
331   /* id of the MQ src_pad event probe */
332   gulong probe_id;
333
334   gboolean is_drained;
335
336   DecodebinOutputStream *output;
337 } MultiQueueSlot;
338
339 /* Streams that are exposed downstream (i.e. output) */
340 struct _DecodebinOutputStream
341 {
342   GstDecodebin3 *dbin;
343   /* The type of stream handled by this output stream */
344   GstStreamType type;
345
346   /* The slot to which this output stream is currently connected to */
347   MultiQueueSlot *slot;
348
349   GstElement *decoder;          /* Optional */
350   GstPad *decoder_sink, *decoder_src;
351   gboolean linked;
352
353   /* ghostpad */
354   GstPad *src_pad;
355   /* Flag if ghost pad is exposed */
356   gboolean src_exposed;
357
358   /* Reported decoder latency */
359   GstClockTime decoder_latency;
360
361   /* keyframe dropping probe */
362   gulong drop_probe_id;
363 };
364
365 /* Pending pads from parsebin */
366 typedef struct _PendingPad
367 {
368   GstDecodebin3 *dbin;
369   DecodebinInput *input;
370   GstPad *pad;
371
372   gulong buffer_probe;
373   gulong event_probe;
374   gboolean saw_eos;
375 } PendingPad;
376
377 /* properties */
378 enum
379 {
380   PROP_0,
381   PROP_CAPS
382 };
383
384 /* signals */
385 enum
386 {
387   SIGNAL_SELECT_STREAM,
388   SIGNAL_ABOUT_TO_FINISH,
389   LAST_SIGNAL
390 };
391 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
392
393 #define SELECTION_LOCK(dbin) G_STMT_START {                             \
394     GST_LOG_OBJECT (dbin,                                               \
395                     "selection locking from thread %p",                 \
396                     g_thread_self ());                                  \
397     g_mutex_lock (&dbin->selection_lock);                               \
398     GST_LOG_OBJECT (dbin,                                               \
399                     "selection locked from thread %p",                  \
400                     g_thread_self ());                                  \
401   } G_STMT_END
402
403 #define SELECTION_UNLOCK(dbin) G_STMT_START {                           \
404     GST_LOG_OBJECT (dbin,                                               \
405                     "selection unlocking from thread %p",               \
406                     g_thread_self ());                                  \
407     g_mutex_unlock (&dbin->selection_lock);                             \
408   } G_STMT_END
409
410 #define INPUT_LOCK(dbin) G_STMT_START {                         \
411     GST_LOG_OBJECT (dbin,                                               \
412                     "input locking from thread %p",                     \
413                     g_thread_self ());                                  \
414     g_mutex_lock (&dbin->input_lock);                           \
415     GST_LOG_OBJECT (dbin,                                               \
416                     "input locked from thread %p",                      \
417                     g_thread_self ());                                  \
418   } G_STMT_END
419
420 #define INPUT_UNLOCK(dbin) G_STMT_START {                               \
421     GST_LOG_OBJECT (dbin,                                               \
422                     "input unlocking from thread %p",           \
423                     g_thread_self ());                                  \
424     g_mutex_unlock (&dbin->input_lock);                         \
425   } G_STMT_END
426
427 GType gst_decodebin3_get_type (void);
428 #define gst_decodebin3_parent_class parent_class
429 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
430 #define _do_init \
431     GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");\
432     playback_element_init (plugin);
433 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (decodebin3, "decodebin3", GST_RANK_NONE,
434     GST_TYPE_DECODEBIN3, _do_init);
435
436 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
437
438 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
439     GST_PAD_SINK,
440     GST_PAD_ALWAYS,
441     GST_STATIC_CAPS_ANY);
442
443 static GstStaticPadTemplate request_sink_template =
444 GST_STATIC_PAD_TEMPLATE ("sink_%u",
445     GST_PAD_SINK,
446     GST_PAD_REQUEST,
447     GST_STATIC_CAPS_ANY);
448
449 static GstStaticPadTemplate video_src_template =
450 GST_STATIC_PAD_TEMPLATE ("video_%u",
451     GST_PAD_SRC,
452     GST_PAD_SOMETIMES,
453     GST_STATIC_CAPS_ANY);
454
455 static GstStaticPadTemplate audio_src_template =
456 GST_STATIC_PAD_TEMPLATE ("audio_%u",
457     GST_PAD_SRC,
458     GST_PAD_SOMETIMES,
459     GST_STATIC_CAPS_ANY);
460
461 static GstStaticPadTemplate text_src_template =
462 GST_STATIC_PAD_TEMPLATE ("text_%u",
463     GST_PAD_SRC,
464     GST_PAD_SOMETIMES,
465     GST_STATIC_CAPS_ANY);
466
467 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
468     GST_PAD_SRC,
469     GST_PAD_SOMETIMES,
470     GST_STATIC_CAPS_ANY);
471
472
473 static void gst_decodebin3_dispose (GObject * object);
474 static void gst_decodebin3_finalize (GObject * object);
475 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
476     const GValue * value, GParamSpec * pspec);
477 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
478     GValue * value, GParamSpec * pspec);
479
480 static gboolean parsebin_autoplug_continue_cb (GstElement *
481     parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
482
483 static gint
484 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
485     GstStreamCollection * collection, GstStream * stream)
486 {
487   GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
488
489   return -1;
490 }
491
492 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
493     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
494 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
495 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
496     GstStateChange transition);
497 static gboolean gst_decodebin3_send_event (GstElement * element,
498     GstEvent * event);
499
500 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
501 #if 0
502 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
503     GstElementFactoryListType ftype);
504 #endif
505
506 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
507 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
508 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
509 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
510
511 static void reconfigure_output_stream (DecodebinOutputStream * output,
512     MultiQueueSlot * slot);
513 static void free_output_stream (GstDecodebin3 * dbin,
514     DecodebinOutputStream * output);
515 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
516     GstStreamType type);
517
518 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
519     GstPadProbeInfo * info, MultiQueueSlot * slot);
520 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
521 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
522     DecodebinInputStream * input);
523 static void link_input_to_slot (DecodebinInputStream * input,
524     MultiQueueSlot * slot);
525 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
526 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
527     MultiQueueSlot * slot);
528
529 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
530 static void update_requested_selection (GstDecodebin3 * dbin);
531
532 /* FIXME: Really make all the parser stuff a self-contained helper object */
533 #include "gstdecodebin3-parse.c"
534
535 static gboolean
536 _gst_int_accumulator (GSignalInvocationHint * ihint,
537     GValue * return_accu, const GValue * handler_return, gpointer dummy)
538 {
539   gint res = g_value_get_int (handler_return);
540
541   g_value_set_int (return_accu, res);
542
543   if (res == -1)
544     return TRUE;
545
546   return FALSE;
547 }
548
549 static void
550 gst_decodebin3_class_init (GstDecodebin3Class * klass)
551 {
552   GObjectClass *gobject_klass = (GObjectClass *) klass;
553   GstElementClass *element_class = (GstElementClass *) klass;
554   GstBinClass *bin_klass = (GstBinClass *) klass;
555
556   gobject_klass->dispose = gst_decodebin3_dispose;
557   gobject_klass->finalize = gst_decodebin3_finalize;
558   gobject_klass->set_property = gst_decodebin3_set_property;
559   gobject_klass->get_property = gst_decodebin3_get_property;
560
561   /* FIXME : ADD PROPERTIES ! */
562   g_object_class_install_property (gobject_klass, PROP_CAPS,
563       g_param_spec_boxed ("caps", "Caps",
564           "The caps on which to stop decoding. (NULL = default)",
565           GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
566
567   /* FIXME : ADD SIGNALS ! */
568   /**
569    * GstDecodebin3::select-stream
570    * @decodebin: a #GstDecodebin3
571    * @collection: a #GstStreamCollection
572    * @stream: a #GstStream
573    *
574    * This signal is emitted whenever @decodebin needs to decide whether
575    * to expose a @stream of a given @collection.
576    *
577    * Note that the prefered way to select streams is to listen to
578    * GST_MESSAGE_STREAM_COLLECTION on the bus and send a
579    * GST_EVENT_SELECT_STREAMS with the streams the user wants.
580    *
581    * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
582    * A value of -1 (default) lets @decodebin decide what to do with the stream.
583    * */
584   gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
585       g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
586       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
587       _gst_int_accumulator, NULL, NULL,
588       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
589
590   /**
591    * GstDecodebin3::about-to-finish:
592    *
593    * This signal is emitted when the data for the selected URI is
594    * entirely buffered and it is safe to specify another URI.
595    */
596   gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
597       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
598       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
599
600
601   element_class->request_new_pad =
602       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
603   element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
604   element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
605
606   gst_element_class_add_pad_template (element_class,
607       gst_static_pad_template_get (&sink_template));
608   gst_element_class_add_pad_template (element_class,
609       gst_static_pad_template_get (&request_sink_template));
610   gst_element_class_add_pad_template (element_class,
611       gst_static_pad_template_get (&video_src_template));
612   gst_element_class_add_pad_template (element_class,
613       gst_static_pad_template_get (&audio_src_template));
614   gst_element_class_add_pad_template (element_class,
615       gst_static_pad_template_get (&text_src_template));
616   gst_element_class_add_pad_template (element_class,
617       gst_static_pad_template_get (&src_template));
618
619   gst_element_class_set_static_metadata (element_class,
620       "Decoder Bin 3", "Generic/Bin/Decoder",
621       "Autoplug and decode to raw media",
622       "Edward Hervey <edward@centricular.com>");
623
624   bin_klass->handle_message = gst_decodebin3_handle_message;
625
626   klass->select_stream = gst_decodebin3_select_stream;
627 }
628
629 static void
630 gst_decodebin3_init (GstDecodebin3 * dbin)
631 {
632   /* Create main input */
633   dbin->main_input = create_new_input (dbin, TRUE);
634
635   dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
636   g_object_get (dbin->multiqueue, "min-interleave-time",
637       &dbin->default_mq_min_interleave, NULL);
638   dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
639   g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
640       "max-size-buffers", 0, "use-interleave", TRUE, NULL);
641   gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
642
643   dbin->current_group_id = GST_GROUP_ID_INVALID;
644
645   g_mutex_init (&dbin->factories_lock);
646   g_mutex_init (&dbin->selection_lock);
647   g_mutex_init (&dbin->input_lock);
648
649   dbin->caps = gst_static_caps_get (&default_raw_caps);
650
651   GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
652 }
653
654 static void
655 gst_decodebin3_dispose (GObject * object)
656 {
657   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
658   GList *walk, *next;
659
660   if (dbin->factories) {
661     gst_plugin_feature_list_free (dbin->factories);
662     dbin->factories = NULL;
663   }
664   if (dbin->decoder_factories) {
665     g_list_free (dbin->decoder_factories);
666     dbin->decoder_factories = NULL;
667   }
668   if (dbin->decodable_factories) {
669     g_list_free (dbin->decodable_factories);
670     dbin->decodable_factories = NULL;
671   }
672   g_list_free_full (dbin->requested_selection, g_free);
673   g_list_free_full (dbin->active_selection, g_free);
674   g_list_free (dbin->to_activate);
675   g_list_free (dbin->pending_select_streams);
676
677   dbin->requested_selection = NULL;
678   dbin->active_selection = NULL;
679   dbin->to_activate = NULL;
680   dbin->pending_select_streams = NULL;
681
682   g_clear_object (&dbin->collection);
683
684   if (dbin->main_input) {
685     free_input (dbin, dbin->main_input);
686     dbin->main_input = NULL;
687   }
688
689   for (walk = dbin->other_inputs; walk; walk = next) {
690     DecodebinInput *input = walk->data;
691
692     next = g_list_next (walk);
693
694     free_input (dbin, input);
695     dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
696   }
697
698   G_OBJECT_CLASS (parent_class)->dispose (object);
699 }
700
701 static void
702 gst_decodebin3_finalize (GObject * object)
703 {
704   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
705
706   g_mutex_clear (&dbin->factories_lock);
707   g_mutex_clear (&dbin->selection_lock);
708   g_mutex_clear (&dbin->input_lock);
709
710   G_OBJECT_CLASS (parent_class)->finalize (object);
711 }
712
713 static void
714 gst_decodebin3_set_property (GObject * object, guint prop_id,
715     const GValue * value, GParamSpec * pspec)
716 {
717   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
718
719   /* FIXME : IMPLEMENT */
720   switch (prop_id) {
721     case PROP_CAPS:
722       GST_OBJECT_LOCK (dbin);
723       if (dbin->caps)
724         gst_caps_unref (dbin->caps);
725       dbin->caps = g_value_dup_boxed (value);
726       GST_OBJECT_UNLOCK (dbin);
727       break;
728     default:
729       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
730       break;
731   }
732 }
733
734 static void
735 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
736     GParamSpec * pspec)
737 {
738   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
739
740   /* FIXME : IMPLEMENT */
741   switch (prop_id) {
742     case PROP_CAPS:
743       GST_OBJECT_LOCK (dbin);
744       g_value_set_boxed (value, dbin->caps);
745       GST_OBJECT_UNLOCK (dbin);
746       break;
747     default:
748       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
749       break;
750   }
751 }
752
753 static gboolean
754 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
755     GstCaps * caps, GstDecodebin3 * dbin)
756 {
757   GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
758
759   /* If it matches our target caps, expose it */
760   if (gst_caps_can_intersect (caps, dbin->caps))
761     return FALSE;
762
763   return TRUE;
764 }
765
766 /* This method should be called whenever a STREAM_START event
767  * comes out of a given parsebin.
768  * The caller shall replace the group_id if the function returns TRUE */
769 static gboolean
770 set_input_group_id (DecodebinInput * input, guint32 * group_id)
771 {
772   GstDecodebin3 *dbin = input->dbin;
773
774   if (input->group_id != *group_id) {
775     if (input->group_id != GST_GROUP_ID_INVALID)
776       GST_WARNING_OBJECT (dbin,
777           "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
778           ") on input %p ", input->group_id, *group_id, input);
779     input->group_id = *group_id;
780   }
781
782   if (*group_id != dbin->current_group_id) {
783     if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
784       GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
785           *group_id);
786       dbin->current_group_id = *group_id;
787     }
788     *group_id = dbin->current_group_id;
789     return TRUE;
790   }
791
792   return FALSE;
793 }
794
795 static void
796 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
797 {
798   GstDecodebin3 *dbin = input->dbin;
799   gboolean all_drained;
800   GList *tmp;
801
802   GST_INFO_OBJECT (dbin, "input %p drained", input);
803   input->drained = TRUE;
804
805   all_drained = dbin->main_input->drained;
806   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
807     DecodebinInput *data = (DecodebinInput *) tmp->data;
808
809     all_drained &= data->drained;
810   }
811
812   if (all_drained) {
813     GST_INFO_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
814     g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
815         NULL);
816   }
817 }
818
819 /* Call with INPUT_LOCK taken */
820 static gboolean
821 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
822 {
823   gboolean set_state = FALSE;
824
825   if (input->parsebin == NULL) {
826     input->parsebin = gst_element_factory_make ("parsebin", NULL);
827     if (input->parsebin == NULL)
828       goto no_parsebin;
829     input->parsebin = gst_object_ref (input->parsebin);
830     input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
831     input->pad_added_sigid =
832         g_signal_connect (input->parsebin, "pad-added",
833         (GCallback) parsebin_pad_added_cb, input);
834     input->pad_removed_sigid =
835         g_signal_connect (input->parsebin, "pad-removed",
836         (GCallback) parsebin_pad_removed_cb, input);
837     input->drained_sigid =
838         g_signal_connect (input->parsebin, "drained",
839         (GCallback) parsebin_drained_cb, input);
840     g_signal_connect (input->parsebin, "autoplug-continue",
841         (GCallback) parsebin_autoplug_continue_cb, dbin);
842   }
843
844   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
845     /* The state lock is taken so that we ensure we are the one (de)activating
846      * parsebin. We need to do this to ensure any activation taking place in
847      * parsebin (including by elements doing upstream activation) are done
848      * within the same thread. */
849     GST_STATE_LOCK (input->parsebin);
850     gst_bin_add (GST_BIN (dbin), input->parsebin);
851     set_state = TRUE;
852   }
853
854   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
855       input->parsebin_sink);
856
857   if (set_state) {
858     gst_element_sync_state_with_parent (input->parsebin);
859     GST_STATE_UNLOCK (input->parsebin);
860   }
861
862   return TRUE;
863
864   /* ERRORS */
865 no_parsebin:
866   {
867     gst_element_post_message ((GstElement *) dbin,
868         gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
869     return FALSE;
870   }
871 }
872
873 static GstPadLinkReturn
874 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
875 {
876   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
877   GstPadLinkReturn res = GST_PAD_LINK_OK;
878   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
879
880   g_return_val_if_fail (input, GST_PAD_LINK_REFUSED);
881
882   GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
883       ". Creating parsebin if needed", pad);
884
885   INPUT_LOCK (dbin);
886   if (!ensure_input_parsebin (dbin, input))
887     res = GST_PAD_LINK_REFUSED;
888   INPUT_UNLOCK (dbin);
889
890   return res;
891 }
892
893 /* Drop duration query during _input_pad_unlink */
894 static GstPadProbeReturn
895 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
896     DecodebinInput * input)
897 {
898   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
899
900   if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
901     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
902     if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
903       GST_LOG_OBJECT (pad, "stop forwarding query duration");
904       ret = GST_PAD_PROBE_HANDLED;
905     }
906   }
907
908   return ret;
909 }
910
911 static void
912 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
913 {
914   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
915   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
916
917   g_return_if_fail (input);
918
919   GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
920       ". Removing parsebin.", pad);
921
922   INPUT_LOCK (dbin);
923   if (input->parsebin == NULL) {
924     INPUT_UNLOCK (dbin);
925     return;
926   }
927
928   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
929     GstStreamCollection *collection = NULL;
930     gulong probe_id = gst_pad_add_probe (input->parsebin_sink,
931         GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
932         (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
933
934     /* Clear stream-collection corresponding to current INPUT and post new
935      * stream-collection message, if needed */
936     if (input->collection) {
937       gst_object_unref (input->collection);
938       input->collection = NULL;
939     }
940
941     SELECTION_LOCK (dbin);
942     collection = get_merged_collection (dbin);
943     if (collection && collection != dbin->collection) {
944       GstMessage *msg;
945       GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
946
947       if (dbin->collection)
948         gst_object_unref (dbin->collection);
949       dbin->collection = collection;
950       dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
951
952       msg =
953           gst_message_new_stream_collection ((GstObject *) dbin,
954           dbin->collection);
955
956       SELECTION_UNLOCK (dbin);
957       gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
958       update_requested_selection (dbin);
959     } else {
960       if (collection)
961         gst_object_unref (collection);
962       SELECTION_UNLOCK (dbin);
963     }
964
965     gst_bin_remove (GST_BIN (dbin), input->parsebin);
966     gst_element_set_state (input->parsebin, GST_STATE_NULL);
967     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
968     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
969     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
970     gst_pad_remove_probe (input->parsebin_sink, probe_id);
971     gst_object_unref (input->parsebin);
972     gst_object_unref (input->parsebin_sink);
973
974     input->parsebin = NULL;
975     input->parsebin_sink = NULL;
976
977     if (!input->is_main) {
978       dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
979       free_input_async (dbin, input);
980     }
981   }
982   INPUT_UNLOCK (dbin);
983   return;
984 }
985
986 static void
987 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
988 {
989   GST_DEBUG ("Freeing input %p", input);
990   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
991   gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
992   if (input->parsebin) {
993     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
994     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
995     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
996     gst_element_set_state (input->parsebin, GST_STATE_NULL);
997     gst_object_unref (input->parsebin);
998     gst_object_unref (input->parsebin_sink);
999   }
1000   if (input->collection)
1001     gst_object_unref (input->collection);
1002   g_free (input);
1003 }
1004
1005 static void
1006 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
1007 {
1008   GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
1009   gst_element_call_async (GST_ELEMENT_CAST (dbin),
1010       (GstElementCallAsyncFunc) free_input, input, NULL);
1011 }
1012
1013 static gboolean
1014 sink_query_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstQuery * query)
1015 {
1016   DecodebinInput *input =
1017       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1018
1019   g_return_val_if_fail (input, FALSE);
1020
1021   GST_DEBUG_OBJECT (sinkpad, "query %" GST_PTR_FORMAT, query);
1022
1023   /* We accept any caps, since we will reconfigure ourself internally if the new
1024    * stream is incompatible */
1025   if (GST_QUERY_TYPE (query) == GST_QUERY_ACCEPT_CAPS) {
1026     GST_DEBUG_OBJECT (dbin, "Accepting ACCEPT_CAPS query");
1027     gst_query_set_accept_caps_result (query, TRUE);
1028     return TRUE;
1029   }
1030   return gst_pad_query_default (sinkpad, GST_OBJECT (dbin), query);
1031 }
1032
1033 static gboolean
1034 sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
1035 {
1036   DecodebinInput *input =
1037       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1038
1039   g_return_val_if_fail (input, FALSE);
1040
1041   GST_DEBUG_OBJECT (sinkpad, "event %" GST_PTR_FORMAT, event);
1042
1043   switch (GST_EVENT_TYPE (event)) {
1044     case GST_EVENT_STREAM_START:
1045     {
1046       GstQuery *q = gst_query_new_selectable ();
1047
1048       /* Query whether upstream can handle stream selection or not */
1049       if (gst_pad_peer_query (sinkpad, q)) {
1050         gst_query_parse_selectable (q, &input->upstream_selected);
1051         GST_DEBUG_OBJECT (sinkpad, "Upstream is selectable : %d",
1052             input->upstream_selected);
1053       } else {
1054         input->upstream_selected = FALSE;
1055         GST_DEBUG_OBJECT (sinkpad, "Upstream does not handle SELECTABLE query");
1056       }
1057       gst_query_unref (q);
1058
1059       /* FIXME : We force `decodebin3` to upstream selection mode if *any* of the
1060          inputs is. This means things might break if there's a mix */
1061       if (input->upstream_selected)
1062         dbin->upstream_selected = TRUE;
1063       break;
1064     }
1065     case GST_EVENT_CAPS:
1066     {
1067       GST_DEBUG_OBJECT (sinkpad,
1068           "New caps, checking if they are compatible with existing parsebin");
1069       if (input->parsebin_sink) {
1070         GstCaps *newcaps = NULL;
1071
1072         gst_event_parse_caps (event, &newcaps);
1073         GST_DEBUG_OBJECT (sinkpad, "new caps %" GST_PTR_FORMAT, newcaps);
1074
1075         if (newcaps
1076             && !gst_pad_query_accept_caps (input->parsebin_sink, newcaps)) {
1077           GST_DEBUG_OBJECT (sinkpad,
1078               "Parsebin doesn't accept the new caps %" GST_PTR_FORMAT, newcaps);
1079           GST_STATE_LOCK (dbin);
1080           /* Reset parsebin so that it reconfigures itself for the new stream format */
1081           gst_element_set_state (input->parsebin, GST_STATE_NULL);
1082           gst_element_sync_state_with_parent (input->parsebin);
1083           GST_STATE_UNLOCK (dbin);
1084         } else {
1085           GST_DEBUG_OBJECT (sinkpad, "Parsebin accepts new caps");
1086         }
1087       }
1088       break;
1089     }
1090     default:
1091       break;
1092   }
1093
1094   /* Chain to parent function */
1095   return gst_pad_event_default (sinkpad, GST_OBJECT (dbin), event);
1096 }
1097
1098 /* Call with INPUT_LOCK taken */
1099 static DecodebinInput *
1100 create_new_input (GstDecodebin3 * dbin, gboolean main)
1101 {
1102   DecodebinInput *input;
1103
1104   input = g_new0 (DecodebinInput, 1);
1105   input->dbin = dbin;
1106   input->is_main = main;
1107   input->group_id = GST_GROUP_ID_INVALID;
1108   if (main)
1109     input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
1110   else {
1111     gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
1112     input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
1113     g_free (pad_name);
1114   }
1115   input->upstream_selected = FALSE;
1116   g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
1117   gst_pad_set_event_function (input->ghost_sink,
1118       (GstPadEventFunction) sink_event_function);
1119   gst_pad_set_query_function (input->ghost_sink,
1120       (GstPadQueryFunction) sink_query_function);
1121   gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
1122   gst_pad_set_unlink_function (input->ghost_sink,
1123       gst_decodebin3_input_pad_unlink);
1124
1125   gst_pad_set_active (input->ghost_sink, TRUE);
1126   gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
1127
1128   return input;
1129
1130 }
1131
1132 static GstPad *
1133 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
1134     const gchar * name, const GstCaps * caps)
1135 {
1136   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1137   DecodebinInput *input;
1138   GstPad *res = NULL;
1139
1140   /* We are ignoring names for the time being, not sure it makes any sense
1141    * within the context of decodebin3 ... */
1142   input = create_new_input (dbin, FALSE);
1143   if (input) {
1144     INPUT_LOCK (dbin);
1145     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1146     res = input->ghost_sink;
1147     INPUT_UNLOCK (dbin);
1148   }
1149
1150   return res;
1151 }
1152
1153 /* Must be called with factories lock! */
1154 static void
1155 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1156 {
1157   guint cookie;
1158
1159   cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1160   if (!dbin->factories || dbin->factories_cookie != cookie) {
1161     GList *tmp;
1162     if (dbin->factories)
1163       gst_plugin_feature_list_free (dbin->factories);
1164     if (dbin->decoder_factories)
1165       g_list_free (dbin->decoder_factories);
1166     if (dbin->decodable_factories)
1167       g_list_free (dbin->decodable_factories);
1168     dbin->factories =
1169         gst_element_factory_list_get_elements
1170         (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1171     dbin->factories =
1172         g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1173     dbin->factories_cookie = cookie;
1174
1175     /* Filter decoder and other decodables */
1176     dbin->decoder_factories = NULL;
1177     dbin->decodable_factories = NULL;
1178     for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1179       GstElementFactory *fact = (GstElementFactory *) tmp->data;
1180       if (gst_element_factory_list_is_type (fact,
1181               GST_ELEMENT_FACTORY_TYPE_DECODER))
1182         dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1183       else
1184         dbin->decodable_factories =
1185             g_list_append (dbin->decodable_factories, fact);
1186     }
1187   }
1188 }
1189
1190 /* Must be called with appropriate lock if list is a protected variable */
1191 static const gchar *
1192 stream_in_list (GList * list, const gchar * sid)
1193 {
1194   GList *tmp;
1195
1196 #if EXTRA_DEBUG
1197   for (tmp = list; tmp; tmp = tmp->next) {
1198     gchar *osid = (gchar *) tmp->data;
1199     GST_DEBUG ("Checking %s against %s", sid, osid);
1200   }
1201 #endif
1202
1203   for (tmp = list; tmp; tmp = tmp->next) {
1204     const gchar *osid = (gchar *) tmp->data;
1205     if (!g_strcmp0 (sid, osid))
1206       return osid;
1207   }
1208
1209   return NULL;
1210 }
1211
1212 static gboolean
1213 stream_list_equal (GList * lista, GList * listb)
1214 {
1215   GList *tmp;
1216
1217   if (g_list_length (lista) != g_list_length (listb))
1218     return FALSE;
1219
1220   for (tmp = lista; tmp; tmp = tmp->next) {
1221     gchar *osid = tmp->data;
1222     if (!stream_in_list (listb, osid))
1223       return FALSE;
1224   }
1225
1226   return TRUE;
1227 }
1228
1229 static void
1230 update_requested_selection (GstDecodebin3 * dbin)
1231 {
1232   guint i, nb;
1233   GList *tmp = NULL;
1234   gboolean all_user_selected = TRUE;
1235   GstStreamType used_types = 0;
1236   GstStreamCollection *collection;
1237
1238   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1239    *  the switch handler will take care of the pending selection */
1240   SELECTION_LOCK (dbin);
1241   if (dbin->pending_select_streams) {
1242     GST_DEBUG_OBJECT (dbin,
1243         "No need to create pending selection, SELECT_STREAMS underway");
1244     goto beach;
1245   }
1246
1247   collection = dbin->collection;
1248   if (G_UNLIKELY (collection == NULL)) {
1249     GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1250     goto beach;
1251   }
1252   nb = gst_stream_collection_get_size (collection);
1253
1254   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1255   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1256
1257   /* 3. If not, check if we already have some of the streams in the
1258    * existing active/requested selection */
1259   for (i = 0; i < nb; i++) {
1260     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1261     const gchar *sid = gst_stream_get_stream_id (stream);
1262     gint request = -1;
1263     /* Fire select-stream signal to see if outside components want to
1264      * hint at which streams should be selected */
1265     g_signal_emit (G_OBJECT (dbin),
1266         gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1267         &request);
1268     GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1269
1270     if (request == -1)
1271       all_user_selected = FALSE;
1272     if (request == 1 || (request == -1
1273             && (stream_in_list (dbin->requested_selection, sid)
1274                 || stream_in_list (dbin->active_selection, sid)))) {
1275       GstStreamType curtype = gst_stream_get_stream_type (stream);
1276       if (request == 1)
1277         GST_DEBUG_OBJECT (dbin,
1278             "Using stream requested by 'select-stream' signal : %s", sid);
1279       else
1280         GST_DEBUG_OBJECT (dbin,
1281             "Re-using stream already present in requested or active selection : %s",
1282             sid);
1283       tmp = g_list_append (tmp, (gchar *) sid);
1284       used_types |= curtype;
1285     }
1286   }
1287
1288   /* 4. If the user didn't explicitly selected all streams, match one stream of each type */
1289   if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) {
1290     for (i = 0; i < nb; i++) {
1291       GstStream *stream = gst_stream_collection_get_stream (collection, i);
1292       GstStreamType curtype = gst_stream_get_stream_type (stream);
1293       if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) {
1294         const gchar *sid = gst_stream_get_stream_id (stream);
1295         GST_DEBUG_OBJECT (dbin,
1296             "Automatically selecting stream '%s' of type %s", sid,
1297             gst_stream_type_get_name (curtype));
1298         tmp = g_list_append (tmp, (gchar *) sid);
1299         used_types |= curtype;
1300       }
1301     }
1302   }
1303
1304 beach:
1305   if (stream_list_equal (tmp, dbin->requested_selection)) {
1306     /* If the selection is equal, there is nothign to do */
1307     GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection");
1308     g_list_free (tmp);
1309     tmp = NULL;
1310   }
1311
1312   if (tmp) {
1313     /* Finally set the requested selection */
1314     if (dbin->requested_selection) {
1315       GST_FIXME_OBJECT (dbin,
1316           "Replacing non-NULL requested_selection, what should we do ??");
1317       g_list_free_full (dbin->requested_selection, g_free);
1318     }
1319     dbin->requested_selection =
1320         g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1321     dbin->selection_updated = TRUE;
1322     g_list_free (tmp);
1323   }
1324   SELECTION_UNLOCK (dbin);
1325 }
1326
1327 /* sort_streams:
1328  * GCompareFunc to use with lists of GstStream.
1329  * Sorts GstStreams by stream type and SELECT flag and stream-id
1330  * First video, then audio, then others.
1331  *
1332  * Return: negative if a<b, 0 if a==b, positive if a>b
1333  */
1334 static gint
1335 sort_streams (GstStream * sa, GstStream * sb)
1336 {
1337   GstStreamType typea, typeb;
1338   GstStreamFlags flaga, flagb;
1339   const gchar *ida, *idb;
1340   gint ret = 0;
1341
1342   typea = gst_stream_get_stream_type (sa);
1343   typeb = gst_stream_get_stream_type (sb);
1344
1345   GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1346       gst_stream_get_stream_id (sb));
1347
1348   /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1349   if (typea != typeb) {
1350     if (typea & GST_STREAM_TYPE_VIDEO)
1351       ret = -1;
1352     else if (typea & GST_STREAM_TYPE_AUDIO)
1353       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1354     else if (typea & GST_STREAM_TYPE_TEXT)
1355       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1356           && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1357     else if (typea & GST_STREAM_TYPE_CONTAINER)
1358       ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1359     else
1360       ret = 1;
1361
1362     if (ret != 0) {
1363       GST_LOG ("Sort by stream-type: %d", ret);
1364       return ret;
1365     }
1366   }
1367
1368   /* Sort by SELECT flag, if stream type is same. */
1369   flaga = gst_stream_get_stream_flags (sa);
1370   flagb = gst_stream_get_stream_flags (sb);
1371
1372   ret =
1373       (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1374       -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1375
1376   if (ret != 0) {
1377     GST_LOG ("Sort by SELECT flag: %d", ret);
1378     return ret;
1379   }
1380
1381   /* Sort by stream-id, if otherwise the same. */
1382   ida = gst_stream_get_stream_id (sa);
1383   idb = gst_stream_get_stream_id (sb);
1384   ret = g_strcmp0 (ida, idb);
1385
1386   GST_LOG ("Sort by stream-id: %d", ret);
1387
1388   return ret;
1389 }
1390
1391 /* Call with INPUT_LOCK taken */
1392 static GstStreamCollection *
1393 get_merged_collection (GstDecodebin3 * dbin)
1394 {
1395   gboolean needs_merge = FALSE;
1396   GstStreamCollection *res = NULL;
1397   GList *tmp;
1398   GList *unsorted_streams = NULL;
1399   guint i, nb_stream;
1400
1401   /* First check if we need to do a merge or just return the only collection */
1402   res = dbin->main_input->collection;
1403
1404   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1405     DecodebinInput *input = (DecodebinInput *) tmp->data;
1406     if (input->collection) {
1407       if (res) {
1408         needs_merge = TRUE;
1409         break;
1410       }
1411       res = input->collection;
1412     }
1413   }
1414
1415   if (!needs_merge) {
1416     GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1417     return res ? gst_object_ref (res) : NULL;
1418   }
1419
1420   /* We really need to create a new collection */
1421   /* FIXME : Some numbering scheme maybe ?? */
1422   res = gst_stream_collection_new ("decodebin3");
1423   if (dbin->main_input->collection) {
1424     nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1425     GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1426     for (i = 0; i < nb_stream; i++) {
1427       GstStream *stream =
1428           gst_stream_collection_get_stream (dbin->main_input->collection, i);
1429       unsorted_streams = g_list_append (unsorted_streams, stream);
1430     }
1431   }
1432
1433   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1434     DecodebinInput *input = (DecodebinInput *) tmp->data;
1435     GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1436         input->collection);
1437     if (input->collection) {
1438       nb_stream = gst_stream_collection_get_size (input->collection);
1439       GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1440       for (i = 0; i < nb_stream; i++) {
1441         GstStream *stream =
1442             gst_stream_collection_get_stream (input->collection, i);
1443         /* Only add if not already present in the list */
1444         if (!g_list_find (unsorted_streams, stream))
1445           unsorted_streams = g_list_append (unsorted_streams, stream);
1446       }
1447     }
1448   }
1449
1450   /* re-order streams : video, then audio, then others */
1451   unsorted_streams =
1452       g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1453   for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1454     GstStream *stream = (GstStream *) tmp->data;
1455     GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1456         gst_stream_get_stream_id (stream));
1457     gst_stream_collection_add_stream (res, gst_object_ref (stream));
1458   }
1459
1460   if (unsorted_streams)
1461     g_list_free (unsorted_streams);
1462
1463   return res;
1464 }
1465
1466 /* Call with INPUT_LOCK taken */
1467 static DecodebinInput *
1468 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1469 {
1470   DecodebinInput *input = NULL;
1471   GstElement *parent = gst_object_ref (child);
1472   GList *tmp;
1473
1474   do {
1475     GstElement *next_parent;
1476
1477     GST_DEBUG_OBJECT (dbin, "parent %s",
1478         parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1479
1480     if (parent == dbin->main_input->parsebin) {
1481       input = dbin->main_input;
1482       break;
1483     }
1484     for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1485       DecodebinInput *cur = (DecodebinInput *) tmp->data;
1486       if (parent == cur->parsebin) {
1487         input = cur;
1488         break;
1489       }
1490     }
1491     next_parent = (GstElement *) gst_element_get_parent (parent);
1492     gst_object_unref (parent);
1493     parent = next_parent;
1494
1495   } while (parent && parent != (GstElement *) dbin);
1496
1497   if (parent)
1498     gst_object_unref (parent);
1499
1500   return input;
1501 }
1502
1503 static const gchar *
1504 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1505 {
1506   guint i, len;
1507
1508   if (dbin->collection == NULL)
1509     return NULL;
1510   len = gst_stream_collection_get_size (dbin->collection);
1511   for (i = 0; i < len; i++) {
1512     GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1513     const gchar *osid = gst_stream_get_stream_id (stream);
1514     if (!g_strcmp0 (sid, osid))
1515       return osid;
1516   }
1517
1518   return NULL;
1519 }
1520
1521 /* Call with INPUT_LOCK taken */
1522 static void
1523 handle_stream_collection (GstDecodebin3 * dbin,
1524     GstStreamCollection * collection, DecodebinInput * input)
1525 {
1526 #ifndef GST_DISABLE_GST_DEBUG
1527   const gchar *upstream_id;
1528   guint i;
1529 #endif
1530   if (!input) {
1531     GST_DEBUG_OBJECT (dbin,
1532         "Couldn't find corresponding input, most likely shutting down");
1533     return;
1534   }
1535
1536   /* Replace collection in input */
1537   if (input->collection)
1538     gst_object_unref (input->collection);
1539   input->collection = gst_object_ref (collection);
1540   GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1541       input);
1542
1543   /* Merge collection if needed */
1544   collection = get_merged_collection (dbin);
1545
1546 #ifndef GST_DISABLE_GST_DEBUG
1547   /* Just some debugging */
1548   upstream_id = gst_stream_collection_get_upstream_id (collection);
1549   GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1550   GST_DEBUG ("From input %p", input);
1551   GST_DEBUG ("  %d streams", gst_stream_collection_get_size (collection));
1552   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1553     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1554     GstTagList *taglist;
1555     GstCaps *caps;
1556
1557     GST_DEBUG ("   Stream '%s'", gst_stream_get_stream_id (stream));
1558     GST_DEBUG ("     type  : %s",
1559         gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1560     GST_DEBUG ("     flags : 0x%x", gst_stream_get_stream_flags (stream));
1561     taglist = gst_stream_get_tags (stream);
1562     GST_DEBUG ("     tags  : %" GST_PTR_FORMAT, taglist);
1563     caps = gst_stream_get_caps (stream);
1564     GST_DEBUG ("     caps  : %" GST_PTR_FORMAT, caps);
1565     if (taglist)
1566       gst_tag_list_unref (taglist);
1567     if (caps)
1568       gst_caps_unref (caps);
1569   }
1570 #endif
1571
1572   /* Store collection for later usage */
1573   SELECTION_LOCK (dbin);
1574   if (dbin->collection == NULL) {
1575     dbin->collection = collection;
1576   } else {
1577     /* We need to check who emitted this collection (the owner).
1578      * If we already had a collection from that user, this one is an update,
1579      * that is to say that we need to figure out how we are going to re-use
1580      * the streams/slot */
1581     GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1582     /* FIXME : When do we switch from pending collection to active collection ?
1583      * When all streams from active collection are drained in multiqueue output ? */
1584     gst_object_unref (dbin->collection);
1585     dbin->collection = collection;
1586     /* dbin->pending_collection = */
1587     /*     g_list_append (dbin->pending_collection, collection); */
1588   }
1589   dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1590   SELECTION_UNLOCK (dbin);
1591 }
1592
1593 /* Must be called with the selection lock taken */
1594 static void
1595 gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
1596 {
1597   GstClockTime max_latency = GST_CLOCK_TIME_NONE;
1598   GList *tmp;
1599
1600   GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
1601   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1602     DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1603     if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
1604       if (max_latency == GST_CLOCK_TIME_NONE
1605           || out->decoder_latency > max_latency)
1606         max_latency = out->decoder_latency;
1607     }
1608   }
1609   GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
1610       GST_TIME_ARGS (max_latency));
1611
1612   if (!GST_CLOCK_TIME_IS_VALID (max_latency))
1613     return;
1614
1615   /* Make sure we keep an extra overhead */
1616   max_latency += 100 * GST_MSECOND;
1617   if (max_latency == dbin->current_mq_min_interleave)
1618     return;
1619
1620   dbin->current_mq_min_interleave = max_latency;
1621   GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
1622       GST_TIME_ARGS (dbin->current_mq_min_interleave));
1623   g_object_set (dbin->multiqueue, "min-interleave-time",
1624       dbin->current_mq_min_interleave, NULL);
1625 }
1626
1627 static void
1628 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1629 {
1630   GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1631   gboolean posting_collection = FALSE;
1632
1633   GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1634
1635   switch (GST_MESSAGE_TYPE (message)) {
1636     case GST_MESSAGE_STREAM_COLLECTION:
1637     {
1638       GstStreamCollection *collection = NULL;
1639       DecodebinInput *input;
1640
1641       INPUT_LOCK (dbin);
1642       input =
1643           find_message_parsebin (dbin,
1644           (GstElement *) GST_MESSAGE_SRC (message));
1645       if (input == NULL) {
1646         GST_DEBUG_OBJECT (dbin,
1647             "Couldn't find corresponding input, most likely shutting down");
1648         INPUT_UNLOCK (dbin);
1649         break;
1650       }
1651       if (input->upstream_selected) {
1652         GST_DEBUG_OBJECT (dbin,
1653             "Upstream handles selection, not using/forwarding collection");
1654         INPUT_UNLOCK (dbin);
1655         goto drop_message;
1656       }
1657       gst_message_parse_stream_collection (message, &collection);
1658       if (collection) {
1659         handle_stream_collection (dbin, collection, input);
1660         posting_collection = TRUE;
1661       }
1662       INPUT_UNLOCK (dbin);
1663
1664       SELECTION_LOCK (dbin);
1665       if (dbin->collection) {
1666         /* Replace collection message, we most likely aggregated it */
1667         GstMessage *new_msg;
1668         new_msg =
1669             gst_message_new_stream_collection ((GstObject *) dbin,
1670             dbin->collection);
1671         gst_message_unref (message);
1672         message = new_msg;
1673       }
1674       SELECTION_UNLOCK (dbin);
1675
1676       if (collection)
1677         gst_object_unref (collection);
1678       break;
1679     }
1680     case GST_MESSAGE_LATENCY:
1681     {
1682       GList *tmp;
1683       /* Check if this is from one of our decoders */
1684       SELECTION_LOCK (dbin);
1685       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1686         DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1687         if (out->decoder == (GstElement *) GST_MESSAGE_SRC (message)) {
1688           GstClockTime min, max;
1689           if (GST_IS_VIDEO_DECODER (out->decoder)) {
1690             gst_video_decoder_get_latency (GST_VIDEO_DECODER (out->decoder),
1691                 &min, &max);
1692             GST_DEBUG_OBJECT (dbin,
1693                 "Got latency update from one of our decoders. min: %"
1694                 GST_TIME_FORMAT " max: %" GST_TIME_FORMAT, GST_TIME_ARGS (min),
1695                 GST_TIME_ARGS (max));
1696             out->decoder_latency = min;
1697             /* Trigger recalculation */
1698             gst_decodebin3_update_min_interleave (dbin);
1699           }
1700           break;
1701         }
1702       }
1703       SELECTION_UNLOCK (dbin);
1704     }
1705     default:
1706       break;
1707   }
1708
1709   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1710
1711   if (posting_collection) {
1712     /* Figure out a selection for that collection */
1713     update_requested_selection (dbin);
1714   }
1715
1716   return;
1717
1718 drop_message:
1719   {
1720     GST_DEBUG_OBJECT (bin, "dropping message");
1721     gst_message_unref (message);
1722   }
1723 }
1724
1725 static DecodebinOutputStream *
1726 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1727 {
1728   GList *tmp;
1729   GstStreamType stype = gst_stream_get_stream_type (stream);
1730
1731   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1732     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1733     if (output->type == stype && output->slot && output->slot->active_stream) {
1734       GstStream *tstream = output->slot->active_stream;
1735       if (!stream_in_list (dbin->requested_selection,
1736               (gchar *) gst_stream_get_stream_id (tstream))) {
1737         return output;
1738       }
1739     }
1740   }
1741
1742   return NULL;
1743 }
1744
1745 /* Give a certain slot, figure out if it should be linked to an
1746  * output stream
1747  * CALL WITH SELECTION LOCK TAKEN !*/
1748 static DecodebinOutputStream *
1749 get_output_for_slot (MultiQueueSlot * slot)
1750 {
1751   GstDecodebin3 *dbin = slot->dbin;
1752   DecodebinOutputStream *output = NULL;
1753   const gchar *stream_id;
1754   GstCaps *caps;
1755   gchar *id_in_list = NULL;
1756
1757   /* If we already have a configured output, just use it */
1758   if (slot->output != NULL)
1759     return slot->output;
1760
1761   /*
1762    * FIXME
1763    *
1764    * This method needs to be split into multiple parts
1765    *
1766    * 1) Figure out whether stream should be exposed or not
1767    *   This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1768    *   in the default stream attribution
1769    *
1770    * 2) Figure out whether an output stream should be created, whether
1771    *   we can re-use the output stream already linked to the slot, or
1772    *   whether we need to get re-assigned another (currently used) output
1773    *   stream.
1774    */
1775
1776   stream_id = gst_stream_get_stream_id (slot->active_stream);
1777   caps = gst_stream_get_caps (slot->active_stream);
1778   GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1779   gst_caps_unref (caps);
1780
1781   /* 0. Emit autoplug-continue signal for pending caps ? */
1782   GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1783
1784   /* 1. if in EXPOSE_ALL_MODE, just accept */
1785   GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1786
1787 #if 0
1788   /* FIXME : The idea around this was to avoid activating a stream for
1789    *     which we have no decoder. Unfortunately it is way too
1790    *     expensive. Need to figure out a better solution */
1791   /* 2. Is there a potential decoder (if one is required) */
1792   if (!gst_caps_can_intersect (caps, dbin->caps)
1793       && !have_factory (dbin, (GstCaps *) caps,
1794           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1795     GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1796         caps);
1797     SELECTION_UNLOCK (dbin);
1798     gst_element_post_message (GST_ELEMENT_CAST (dbin),
1799         gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1800     SELECTION_LOCK (dbin);
1801     return NULL;
1802   }
1803 #endif
1804
1805   /* 3. In default mode check if we should expose */
1806   id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
1807   if (id_in_list || dbin->upstream_selected) {
1808     /* Check if we can steal an existing output stream we could re-use.
1809      * that is:
1810      * * an output stream whose slot->stream is not in requested
1811      * * and is of the same type as this stream
1812      */
1813     output = find_free_compatible_output (dbin, slot->active_stream);
1814     if (output) {
1815       /* Move this output from its current slot to this slot */
1816       dbin->to_activate =
1817           g_list_append (dbin->to_activate, (gchar *) stream_id);
1818       dbin->requested_selection =
1819           g_list_remove (dbin->requested_selection, id_in_list);
1820       g_free (id_in_list);
1821       SELECTION_UNLOCK (dbin);
1822       gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1823           (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1824       SELECTION_LOCK (dbin);
1825       return NULL;
1826     }
1827
1828     output = create_output_stream (dbin, slot->type);
1829     output->slot = slot;
1830     GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1831     slot->output = output;
1832     GST_DEBUG ("Adding '%s' to active_selection", stream_id);
1833     dbin->active_selection =
1834         g_list_append (dbin->active_selection, (gchar *) g_strdup (stream_id));
1835   } else
1836     GST_DEBUG ("Not creating any output for slot %p", slot);
1837
1838   return output;
1839 }
1840
1841 /* Returns SELECTED_STREAMS message if active_selection is equal to
1842  * requested_selection, else NULL.
1843  * Must be called with LOCK taken */
1844 static GstMessage *
1845 is_selection_done (GstDecodebin3 * dbin)
1846 {
1847   GList *tmp;
1848   GstMessage *msg;
1849
1850   if (!dbin->selection_updated)
1851     return NULL;
1852
1853   GST_LOG_OBJECT (dbin, "Checking");
1854
1855   if (dbin->to_activate != NULL) {
1856     GST_DEBUG ("Still have streams to activate");
1857     return NULL;
1858   }
1859   for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1860     GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1861     if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1862       GST_DEBUG ("Not in active selection, returning");
1863       return NULL;
1864     }
1865   }
1866
1867   GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1868
1869   /* We are completely active */
1870   msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1871   if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) {
1872     gst_message_set_seqnum (msg, dbin->select_streams_seqnum);
1873   }
1874   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1875     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1876     if (output->slot) {
1877       GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1878           gst_stream_get_stream_id (output->slot->active_stream));
1879
1880       gst_message_streams_selected_add (msg, output->slot->active_stream);
1881     } else
1882       GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1883   }
1884   dbin->selection_updated = FALSE;
1885   return msg;
1886 }
1887
1888 /* Must be called with SELECTION_LOCK taken */
1889 static void
1890 check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev)
1891 {
1892   gboolean all_drained = TRUE;
1893   GList *iter;
1894
1895   GST_DEBUG_OBJECT (dbin, "check slot for eos");
1896
1897   for (iter = dbin->slots; iter; iter = iter->next) {
1898     MultiQueueSlot *slot = iter->data;
1899
1900     if (!slot->output)
1901       continue;
1902
1903     if (slot->is_drained) {
1904       GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
1905       continue;
1906     }
1907
1908     all_drained = FALSE;
1909     break;
1910   }
1911
1912   if (all_drained) {
1913     INPUT_LOCK (dbin);
1914     if (!pending_pads_are_eos (dbin->main_input))
1915       all_drained = FALSE;
1916
1917     if (all_drained) {
1918       for (iter = dbin->other_inputs; iter; iter = iter->next) {
1919         if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
1920           all_drained = FALSE;
1921           break;
1922         }
1923       }
1924     }
1925     INPUT_UNLOCK (dbin);
1926   }
1927
1928   if (all_drained) {
1929     GST_DEBUG_OBJECT (dbin,
1930         "All active slots are drained, and no pending input, push EOS");
1931
1932     for (iter = dbin->input_streams; iter; iter = iter->next) {
1933       DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
1934       GstPad *peer = gst_pad_get_peer (input->srcpad);
1935
1936       /* Send EOS to all slots */
1937       if (peer) {
1938         GstEvent *stream_start, *eos;
1939
1940         stream_start =
1941             gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1942
1943         /* First forward a custom STREAM_START event to reset the EOS status (if any) */
1944         if (stream_start) {
1945           GstStructure *s;
1946           GstEvent *custom_stream_start = gst_event_copy (stream_start);
1947           gst_event_unref (stream_start);
1948           s = (GstStructure *) gst_event_get_structure (custom_stream_start);
1949           gst_structure_set (s, "decodebin3-flushing-stream-start",
1950               G_TYPE_BOOLEAN, TRUE, NULL);
1951           gst_pad_send_event (peer, custom_stream_start);
1952         }
1953
1954         eos = gst_event_new_eos ();
1955         gst_event_set_seqnum (eos, gst_event_get_seqnum (ev));
1956         gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
1957             CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
1958             NULL);
1959         gst_pad_send_event (peer, eos);
1960         gst_object_unref (peer);
1961       } else
1962         GST_DEBUG_OBJECT (dbin, "no output");
1963     }
1964   }
1965 }
1966
1967 static GstPadProbeReturn
1968 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1969     MultiQueueSlot * slot)
1970 {
1971   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1972   GstDecodebin3 *dbin = slot->dbin;
1973
1974   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1975     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1976
1977     GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1978     switch (GST_EVENT_TYPE (ev)) {
1979       case GST_EVENT_STREAM_START:
1980       {
1981         GstStream *stream = NULL;
1982         const GstStructure *s = gst_event_get_structure (ev);
1983
1984         /* Drop STREAM_START events used to cleanup multiqueue */
1985         if (s
1986             && gst_structure_has_field (s,
1987                 "decodebin3-flushing-stream-start")) {
1988           ret = GST_PAD_PROBE_HANDLED;
1989           gst_event_unref (ev);
1990           break;
1991         }
1992
1993         gst_event_parse_stream (ev, &stream);
1994         if (stream == NULL) {
1995           GST_ERROR_OBJECT (pad,
1996               "Got a STREAM_START event without a GstStream");
1997           break;
1998         }
1999         slot->is_drained = FALSE;
2000         GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
2001             gst_stream_get_stream_id (stream));
2002         if (slot->active_stream == NULL) {
2003           slot->active_stream = stream;
2004         } else if (slot->active_stream != stream) {
2005           GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
2006               gst_stream_get_stream_id (slot->active_stream),
2007               gst_stream_get_stream_id (stream));
2008           gst_object_unref (slot->active_stream);
2009           slot->active_stream = stream;
2010         } else
2011           gst_object_unref (stream);
2012 #if 0                           /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
2013         {
2014           gboolean is_active, is_requested;
2015           /* Quick check to see if we're in the current selection */
2016           /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
2017           SELECTION_LOCK (dbin);
2018           GST_DEBUG_OBJECT (dbin, "Checking active selection");
2019           is_active = stream_in_list (dbin->active_selection, stream_id);
2020           GST_DEBUG_OBJECT (dbin, "Checking requested selection");
2021           is_requested = stream_in_list (dbin->requested_selection, stream_id);
2022           SELECTION_UNLOCK (dbin);
2023           if (is_active)
2024             GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
2025                 slot->output);
2026           if (is_requested)
2027             GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
2028                 slot->output);
2029           else if (slot->output) {
2030             GST_DEBUG_OBJECT (pad,
2031                 "Slot needs to be deactivated ? It's no longer in requested selection");
2032           } else if (!is_active)
2033             GST_DEBUG_OBJECT (pad,
2034                 "Slot in neither active nor requested selection");
2035         }
2036 #endif
2037       }
2038         break;
2039       case GST_EVENT_CAPS:
2040       {
2041         /* Configure the output slot if needed */
2042         DecodebinOutputStream *output;
2043         GstMessage *msg = NULL;
2044         SELECTION_LOCK (dbin);
2045         output = get_output_for_slot (slot);
2046         if (output) {
2047           reconfigure_output_stream (output, slot);
2048           msg = is_selection_done (dbin);
2049         }
2050         SELECTION_UNLOCK (dbin);
2051         if (msg)
2052           gst_element_post_message ((GstElement *) slot->dbin, msg);
2053       }
2054         break;
2055       case GST_EVENT_EOS:
2056       {
2057         gboolean was_drained = slot->is_drained;
2058         slot->is_drained = TRUE;
2059
2060         /* Custom EOS handling first */
2061         if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2062                 CUSTOM_EOS_QUARK)) {
2063           /* remove custom-eos */
2064           ev = gst_event_make_writable (ev);
2065           GST_PAD_PROBE_INFO_DATA (info) = ev;
2066           gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
2067               CUSTOM_EOS_QUARK, NULL, NULL);
2068
2069           GST_LOG_OBJECT (pad, "Received custom EOS");
2070           ret = GST_PAD_PROBE_HANDLED;
2071           SELECTION_LOCK (dbin);
2072           if (slot->input == NULL) {
2073             GST_DEBUG_OBJECT (pad,
2074                 "Got custom-eos from null input stream, remove output stream");
2075             /* Remove the output */
2076             if (slot->output) {
2077               DecodebinOutputStream *output = slot->output;
2078               dbin->output_streams =
2079                   g_list_remove (dbin->output_streams, output);
2080               free_output_stream (dbin, output);
2081               /* Reacalculate min interleave */
2082               gst_decodebin3_update_min_interleave (dbin);
2083             }
2084             slot->probe_id = 0;
2085             dbin->slots = g_list_remove (dbin->slots, slot);
2086             free_multiqueue_slot_async (dbin, slot);
2087             ret = GST_PAD_PROBE_REMOVE;
2088           } else if (!was_drained) {
2089             check_all_slot_for_eos (dbin, ev);
2090           }
2091           if (ret == GST_PAD_PROBE_HANDLED)
2092             gst_event_unref (ev);
2093           SELECTION_UNLOCK (dbin);
2094           break;
2095         }
2096
2097         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
2098             slot->input);
2099         if (slot->input == NULL) {
2100           GstPad *peer;
2101           GST_DEBUG_OBJECT (pad,
2102               "last EOS for input, forwarding and removing slot");
2103           peer = gst_pad_get_peer (pad);
2104           if (peer) {
2105             gst_pad_send_event (peer, gst_event_ref (ev));
2106             gst_object_unref (peer);
2107           }
2108           SELECTION_LOCK (dbin);
2109           /* FIXME : Shouldn't we try to re-assign the output instead of just
2110            * removing it ? */
2111           /* Remove the output */
2112           if (slot->output) {
2113             DecodebinOutputStream *output = slot->output;
2114             dbin->output_streams = g_list_remove (dbin->output_streams, output);
2115             free_output_stream (dbin, output);
2116           }
2117           slot->probe_id = 0;
2118           dbin->slots = g_list_remove (dbin->slots, slot);
2119           SELECTION_UNLOCK (dbin);
2120
2121           /* FIXME: Removing the slot is async, which means actually
2122            * unlinking the pad is async. Other things like stream-start
2123            * might flow through this (now unprobed) link before it actually
2124            * gets released */
2125           free_multiqueue_slot_async (dbin, slot);
2126           ret = GST_PAD_PROBE_REMOVE;
2127         } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2128                 CUSTOM_FINAL_EOS_QUARK)) {
2129           GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
2130         } else {
2131           GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
2132           /* drop current event as eos will be sent in check_all_slot_for_eos
2133            * when all output streams are also eos */
2134           ret = GST_PAD_PROBE_DROP;
2135           SELECTION_LOCK (dbin);
2136           check_all_slot_for_eos (dbin, ev);
2137           SELECTION_UNLOCK (dbin);
2138         }
2139       }
2140         break;
2141       default:
2142         break;
2143     }
2144   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
2145     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
2146     switch (GST_QUERY_TYPE (query)) {
2147       case GST_QUERY_CAPS:
2148       {
2149         GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
2150         gst_query_set_caps_result (query, GST_CAPS_ANY);
2151         ret = GST_PAD_PROBE_HANDLED;
2152       }
2153         break;
2154
2155       case GST_QUERY_ACCEPT_CAPS:
2156       {
2157         GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
2158         /* If the current decoder doesn't accept caps, we'll reconfigure
2159          * on the actual caps event. So accept any caps. */
2160         gst_query_set_accept_caps_result (query, TRUE);
2161         ret = GST_PAD_PROBE_HANDLED;
2162       }
2163       default:
2164         break;
2165     }
2166   }
2167
2168   return ret;
2169 }
2170
2171 /* Create a new multiqueue slot for the given type
2172  *
2173  * It is up to the caller to know whether that slot is needed or not
2174  * (and release it when no longer needed) */
2175 static MultiQueueSlot *
2176 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
2177 {
2178   MultiQueueSlot *slot;
2179   GstIterator *it = NULL;
2180   GValue item = { 0, };
2181
2182   GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
2183       gst_stream_type_get_name (type));
2184   slot = g_new0 (MultiQueueSlot, 1);
2185   slot->dbin = dbin;
2186
2187   slot->id = dbin->slot_id++;
2188
2189   slot->type = type;
2190   slot->sink_pad = gst_element_request_pad_simple (dbin->multiqueue, "sink_%u");
2191   if (slot->sink_pad == NULL)
2192     goto fail;
2193
2194   it = gst_pad_iterate_internal_links (slot->sink_pad);
2195   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
2196       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
2197     GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
2198         GST_DEBUG_PAD_NAME (slot->src_pad));
2199     goto fail;
2200   }
2201   gst_iterator_free (it);
2202   g_value_reset (&item);
2203
2204   g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
2205
2206   /* Add event probe */
2207   slot->probe_id =
2208       gst_pad_add_probe (slot->src_pad,
2209       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2210       (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
2211
2212   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
2213       GST_DEBUG_PAD_NAME (slot->src_pad));
2214
2215   dbin->slots = g_list_append (dbin->slots, slot);
2216
2217   return slot;
2218
2219   /* ERRORS */
2220 fail:
2221   {
2222     if (slot->sink_pad)
2223       gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2224     g_free (slot);
2225     return NULL;
2226   }
2227 }
2228
2229 /* Must be called with SELECTION_LOCK */
2230 static MultiQueueSlot *
2231 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
2232 {
2233   GList *tmp;
2234   MultiQueueSlot *empty_slot = NULL;
2235   GstStreamType input_type = 0;
2236   gchar *stream_id = NULL;
2237
2238   GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
2239       input, input->active_stream,
2240       input->
2241       active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
2242
2243   if (input->active_stream) {
2244     input_type = gst_stream_get_stream_type (input->active_stream);
2245     stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
2246   }
2247
2248   /* Go over existing slots and check if there is already one for it */
2249   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2250     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2251     /* Already used input, return that one */
2252     if (slot->input == input) {
2253       GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
2254       return slot;
2255     }
2256   }
2257
2258   /* Go amongst all unused slots of the right type and try to find a candidate */
2259   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2260     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2261     if (slot->input == NULL && input_type == slot->type) {
2262       /* Remember this empty slot for later */
2263       empty_slot = slot;
2264       /* Check if available slot is of the same stream_id */
2265       GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2266           slot->id, slot->active_stream);
2267       if (stream_id && slot->active_stream) {
2268         gchar *ostream_id =
2269             (gchar *) gst_stream_get_stream_id (slot->active_stream);
2270         GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2271             ostream_id, stream_id);
2272         if (!g_strcmp0 (stream_id, ostream_id))
2273           break;
2274       }
2275     }
2276   }
2277
2278   if (empty_slot) {
2279     GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2280     empty_slot->input = input;
2281     return empty_slot;
2282   }
2283
2284   if (input_type)
2285     return create_new_slot (dbin, input_type);
2286
2287   return NULL;
2288 }
2289
2290 static void
2291 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2292 {
2293   if (slot->input != NULL && slot->input != input) {
2294     GST_ERROR_OBJECT (slot->dbin,
2295         "Trying to link input to an already used slot");
2296     return;
2297   }
2298   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2299   slot->pending_stream = input->active_stream;
2300   slot->input = input;
2301 }
2302
2303 #if 0
2304 static gboolean
2305 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2306     GstElementFactoryListType ftype)
2307 {
2308   gboolean ret = FALSE;
2309   GList *res;
2310
2311   g_mutex_lock (&dbin->factories_lock);
2312   gst_decode_bin_update_factories_list (dbin);
2313   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2314     res =
2315         gst_element_factory_list_filter (dbin->decoder_factories,
2316         caps, GST_PAD_SINK, TRUE);
2317   else
2318     res =
2319         gst_element_factory_list_filter (dbin->decodable_factories,
2320         caps, GST_PAD_SINK, TRUE);
2321   g_mutex_unlock (&dbin->factories_lock);
2322
2323   if (res) {
2324     ret = TRUE;
2325     gst_plugin_feature_list_free (res);
2326   }
2327
2328   return ret;
2329 }
2330 #endif
2331
2332 static GList *
2333 create_decoder_factory_list (GstDecodebin3 * dbin, GstCaps * caps)
2334 {
2335   GList *res;
2336
2337   g_mutex_lock (&dbin->factories_lock);
2338   gst_decode_bin_update_factories_list (dbin);
2339   res = gst_element_factory_list_filter (dbin->decoder_factories,
2340       caps, GST_PAD_SINK, TRUE);
2341   g_mutex_unlock (&dbin->factories_lock);
2342   return res;
2343 }
2344
2345 static GstPadProbeReturn
2346 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2347     DecodebinOutputStream * output)
2348 {
2349   GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2350   /* If we have a keyframe, remove the probe and let all data through */
2351   /* FIXME : HANDLE HEADER BUFFER ?? */
2352   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2353       GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2354     GST_DEBUG_OBJECT (pad,
2355         "Buffer is keyframe or header, letting through and removing probe");
2356     output->drop_probe_id = 0;
2357     return GST_PAD_PROBE_REMOVE;
2358   }
2359   GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2360   return GST_PAD_PROBE_DROP;
2361 }
2362
2363 static void
2364 reconfigure_output_stream (DecodebinOutputStream * output,
2365     MultiQueueSlot * slot)
2366 {
2367   GstDecodebin3 *dbin = output->dbin;
2368   GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2369   gboolean needs_decoder;
2370
2371   needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2372
2373   GST_DEBUG_OBJECT (dbin,
2374       "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2375       needs_decoder);
2376
2377   /* FIXME : Maybe make the output un-hook itself automatically ? */
2378   if (output->slot != NULL && output->slot != slot) {
2379     GST_WARNING_OBJECT (dbin,
2380         "Output still linked to another slot (%p)", output->slot);
2381     gst_caps_unref (new_caps);
2382     return;
2383   }
2384
2385   /* Check if existing config is reusable as-is by checking if
2386    * the existing decoder accepts the new caps, if not delete
2387    * it and create a new one */
2388   if (output->decoder) {
2389     gboolean can_reuse_decoder;
2390
2391     if (needs_decoder) {
2392       can_reuse_decoder =
2393           gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2394     } else
2395       can_reuse_decoder = FALSE;
2396
2397     if (can_reuse_decoder) {
2398       if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2399         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2400         output->drop_probe_id =
2401             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2402             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2403       }
2404       GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2405       if (output->linked == FALSE) {
2406         gst_pad_link_full (slot->src_pad, output->decoder_sink,
2407             GST_PAD_LINK_CHECK_NOTHING);
2408         output->linked = TRUE;
2409       }
2410       gst_caps_unref (new_caps);
2411       return;
2412     }
2413
2414     GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2415
2416     if (output->linked)
2417       gst_pad_unlink (slot->src_pad, output->decoder_sink);
2418     output->linked = FALSE;
2419     if (output->drop_probe_id) {
2420       gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2421       output->drop_probe_id = 0;
2422     }
2423
2424     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2425       GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2426       gst_caps_unref (new_caps);
2427       goto cleanup;
2428     }
2429
2430     gst_element_set_locked_state (output->decoder, TRUE);
2431     gst_element_set_state (output->decoder, GST_STATE_NULL);
2432
2433     gst_bin_remove ((GstBin *) dbin, output->decoder);
2434     output->decoder = NULL;
2435     output->decoder_latency = GST_CLOCK_TIME_NONE;
2436   } else if (output->linked) {
2437     /* Otherwise if we have no decoder yet but the output is linked make
2438      * sure that the ghost pad is really unlinked in case no decoder was
2439      * needed previously */
2440     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2441       GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
2442       gst_caps_unref (new_caps);
2443       goto cleanup;
2444     }
2445   }
2446
2447   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2448   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2449
2450   /* If a decoder is required, create one */
2451   if (needs_decoder) {
2452     GList *factories, *next_factory;
2453
2454     factories = next_factory = create_decoder_factory_list (dbin, new_caps);
2455     while (!output->decoder) {
2456       gboolean decoder_failed = FALSE;
2457
2458       /* If we don't have a decoder yet, instantiate one */
2459       if (next_factory) {
2460         output->decoder = gst_element_factory_create ((GstElementFactory *)
2461             next_factory->data, NULL);
2462         GST_DEBUG ("Created decoder '%s'", GST_ELEMENT_NAME (output->decoder));
2463       } else
2464         GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
2465             new_caps);
2466
2467       if (output->decoder == NULL) {
2468         GstCaps *caps;
2469
2470         SELECTION_UNLOCK (dbin);
2471         /* FIXME : Should we be smarter if there's a missing decoder ?
2472          * Should we deactivate that stream ? */
2473         caps = gst_stream_get_caps (slot->active_stream);
2474         gst_element_post_message (GST_ELEMENT_CAST (dbin),
2475             gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2476         gst_caps_unref (caps);
2477         SELECTION_LOCK (dbin);
2478         goto cleanup;
2479       }
2480       if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2481         GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2482         goto cleanup;
2483       }
2484       output->decoder_sink =
2485           gst_element_get_static_pad (output->decoder, "sink");
2486       output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2487       if (output->type & GST_STREAM_TYPE_VIDEO) {
2488         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2489         output->drop_probe_id =
2490             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2491             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2492       }
2493       if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2494               GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2495         GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2496             GST_DEBUG_PAD_NAME (output->decoder_sink));
2497         goto cleanup;
2498       }
2499       if (gst_element_set_state (output->decoder,
2500               GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
2501         GST_DEBUG_OBJECT (dbin,
2502             "Decoder '%s' failed to reach READY state, trying the next type",
2503             GST_ELEMENT_NAME (output->decoder));
2504         decoder_failed = TRUE;
2505       }
2506       if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
2507         GST_DEBUG_OBJECT (dbin,
2508             "Decoder '%s' did not accept the caps, trying the next type",
2509             GST_ELEMENT_NAME (output->decoder));
2510         decoder_failed = TRUE;
2511       }
2512       if (decoder_failed) {
2513         gst_pad_unlink (slot->src_pad, output->decoder_sink);
2514         if (output->drop_probe_id) {
2515           gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2516           output->drop_probe_id = 0;
2517         }
2518
2519         gst_element_set_locked_state (output->decoder, TRUE);
2520         gst_element_set_state (output->decoder, GST_STATE_NULL);
2521
2522         gst_bin_remove ((GstBin *) dbin, output->decoder);
2523         output->decoder = NULL;
2524       }
2525       next_factory = next_factory->next;
2526     }
2527     gst_plugin_feature_list_free (factories);
2528   } else {
2529     output->decoder_src = gst_object_ref (slot->src_pad);
2530     output->decoder_sink = NULL;
2531   }
2532   gst_caps_unref (new_caps);
2533
2534   output->linked = TRUE;
2535   if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2536           output->decoder_src)) {
2537     GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2538     goto cleanup;
2539   }
2540   if (output->src_exposed == FALSE) {
2541     GstEvent *stream_start;
2542
2543     stream_start = gst_pad_get_sticky_event (slot->src_pad,
2544         GST_EVENT_STREAM_START, 0);
2545
2546     /* Ensure GstStream is accesiable from pad-added callback */
2547     if (stream_start) {
2548       gst_pad_store_sticky_event (output->src_pad, stream_start);
2549       gst_event_unref (stream_start);
2550     } else {
2551       GST_WARNING_OBJECT (slot->src_pad,
2552           "Pad has no stored stream-start event");
2553     }
2554
2555     output->src_exposed = TRUE;
2556     gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2557   }
2558
2559   if (output->decoder)
2560     gst_element_sync_state_with_parent (output->decoder);
2561
2562   output->slot = slot;
2563   return;
2564
2565 cleanup:
2566   {
2567     GST_DEBUG_OBJECT (dbin, "Cleanup");
2568     if (output->decoder_sink) {
2569       gst_object_unref (output->decoder_sink);
2570       output->decoder_sink = NULL;
2571     }
2572     if (output->decoder_src) {
2573       gst_object_unref (output->decoder_src);
2574       output->decoder_src = NULL;
2575     }
2576     if (output->decoder) {
2577       gst_element_set_state (output->decoder, GST_STATE_NULL);
2578       gst_bin_remove ((GstBin *) dbin, output->decoder);
2579       output->decoder = NULL;
2580     }
2581   }
2582 }
2583
2584 static GstPadProbeReturn
2585 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2586 {
2587   GstMessage *msg = NULL;
2588   DecodebinOutputStream *output;
2589
2590   SELECTION_LOCK (slot->dbin);
2591   output = get_output_for_slot (slot);
2592
2593   GST_DEBUG_OBJECT (pad, "output : %p", output);
2594
2595   if (output) {
2596     reconfigure_output_stream (output, slot);
2597     msg = is_selection_done (slot->dbin);
2598   }
2599   SELECTION_UNLOCK (slot->dbin);
2600   if (msg)
2601     gst_element_post_message ((GstElement *) slot->dbin, msg);
2602
2603   return GST_PAD_PROBE_REMOVE;
2604 }
2605
2606 static MultiQueueSlot *
2607 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2608 {
2609   GList *tmp;
2610
2611   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2612     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2613     const gchar *stream_id;
2614     if (slot->active_stream) {
2615       stream_id = gst_stream_get_stream_id (slot->active_stream);
2616       if (!g_strcmp0 (sid, stream_id))
2617         return slot;
2618     }
2619     if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2620       stream_id = gst_stream_get_stream_id (slot->pending_stream);
2621       if (!g_strcmp0 (sid, stream_id))
2622         return slot;
2623     }
2624   }
2625
2626   return NULL;
2627 }
2628
2629 /* This function handles the reassignment of a slot. Call this from
2630  * the streaming thread of a slot. */
2631 static gboolean
2632 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2633 {
2634   DecodebinOutputStream *output;
2635   MultiQueueSlot *target_slot = NULL;
2636   GList *tmp;
2637   const gchar *sid, *tsid;
2638
2639   SELECTION_LOCK (dbin);
2640   output = slot->output;
2641
2642   if (G_UNLIKELY (slot->active_stream == NULL)) {
2643     GST_DEBUG_OBJECT (slot->src_pad,
2644         "Called on inactive slot (active_stream == NULL)");
2645     SELECTION_UNLOCK (dbin);
2646     return FALSE;
2647   }
2648
2649   if (G_UNLIKELY (output == NULL)) {
2650     GST_DEBUG_OBJECT (slot->src_pad,
2651         "Slot doesn't have any output to be removed");
2652     SELECTION_UNLOCK (dbin);
2653     return FALSE;
2654   }
2655
2656   sid = gst_stream_get_stream_id (slot->active_stream);
2657   GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2658
2659   /* Recheck whether this stream is still in the list of streams to deactivate */
2660   if (stream_in_list (dbin->requested_selection, sid)) {
2661     /* Stream is in the list of requested streams, don't remove */
2662     SELECTION_UNLOCK (dbin);
2663     GST_DEBUG_OBJECT (slot->src_pad,
2664         "Stream '%s' doesn't need to be deactivated", sid);
2665     return FALSE;
2666   }
2667
2668   /* Unlink slot from output */
2669   /* FIXME : Handle flushing ? */
2670   /* FIXME : Handle outputs without decoders */
2671   GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2672       output->decoder_sink);
2673   if (output->decoder_sink)
2674     gst_pad_unlink (slot->src_pad, output->decoder_sink);
2675   output->linked = FALSE;
2676   slot->output = NULL;
2677   output->slot = NULL;
2678   /* Remove sid from active selection */
2679   GST_DEBUG ("Removing '%s' from active_selection", sid);
2680   for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2681     if (!g_strcmp0 (sid, tmp->data)) {
2682       g_free (tmp->data);
2683       dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2684       break;
2685     }
2686
2687   /* Can we re-assign this output to a requested stream ? */
2688   GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2689   for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2690     MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2691     GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2692         tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2693     if (tslot && tslot->type == output->type && tslot->output == NULL) {
2694       GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2695       target_slot = tslot;
2696       tsid = tmp->data;
2697       /* Pass target stream id to requested selection */
2698       dbin->requested_selection =
2699           g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2700       dbin->to_activate = g_list_delete_link (dbin->to_activate, tmp);
2701       break;
2702     }
2703   }
2704
2705   if (target_slot) {
2706     GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2707         target_slot, tsid);
2708     target_slot->output = output;
2709     output->slot = target_slot;
2710     GST_DEBUG ("Adding '%s' to active_selection", tsid);
2711     dbin->active_selection =
2712         g_list_append (dbin->active_selection, (gchar *) g_strdup (tsid));
2713     SELECTION_UNLOCK (dbin);
2714
2715     /* Wakeup the target slot so that it retries to send events/buffers
2716      * thereby triggering the output reconfiguration codepath */
2717     gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2718         (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2719     /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2720   } else {
2721     GstMessage *msg;
2722
2723     dbin->output_streams = g_list_remove (dbin->output_streams, output);
2724     free_output_stream (dbin, output);
2725     msg = is_selection_done (slot->dbin);
2726     SELECTION_UNLOCK (dbin);
2727
2728     if (msg)
2729       gst_element_post_message ((GstElement *) slot->dbin, msg);
2730   }
2731
2732   return TRUE;
2733 }
2734
2735 /* Idle probe called when a slot should be unassigned from its output stream.
2736  * This is needed to ensure nothing is flowing when unlinking the slot.
2737  *
2738  * Also, this method will search for a pending stream which could re-use
2739  * the output stream. */
2740 static GstPadProbeReturn
2741 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2742     MultiQueueSlot * slot)
2743 {
2744   GstDecodebin3 *dbin = slot->dbin;
2745
2746   reassign_slot (dbin, slot);
2747
2748   return GST_PAD_PROBE_REMOVE;
2749 }
2750
2751 static gboolean
2752 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2753     guint32 seqnum)
2754 {
2755   gboolean ret = TRUE;
2756   GList *tmp;
2757   /* List of slots to (de)activate. */
2758   GList *to_deactivate = NULL;
2759   GList *to_activate = NULL;
2760   /* List of unknown stream id, most likely means the event
2761    * should be sent upstream so that elements can expose the requested stream */
2762   GList *unknown = NULL;
2763   GList *to_reassign = NULL;
2764   GList *future_request_streams = NULL;
2765   GList *pending_streams = NULL;
2766   GList *slots_to_reassign = NULL;
2767
2768   SELECTION_LOCK (dbin);
2769   if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2770     GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2771     SELECTION_UNLOCK (dbin);
2772     return TRUE;
2773   }
2774   /* Remove pending select_streams */
2775   g_list_free (dbin->pending_select_streams);
2776   dbin->pending_select_streams = NULL;
2777
2778   /* COMPARE the requested streams to the active and requested streams
2779    * on multiqueue. */
2780
2781   /* First check the slots to activate and which ones are unknown */
2782   for (tmp = select_streams; tmp; tmp = tmp->next) {
2783     const gchar *sid = (const gchar *) tmp->data;
2784     MultiQueueSlot *slot;
2785     GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2786     slot = find_slot_for_stream_id (dbin, sid);
2787     /* Find the corresponding slot */
2788     if (slot == NULL) {
2789       if (stream_in_collection (dbin, (gchar *) sid)) {
2790         pending_streams = g_list_append (pending_streams, (gchar *) sid);
2791       } else {
2792         GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2793         unknown = g_list_append (unknown, (gchar *) sid);
2794       }
2795     } else if (slot->output == NULL) {
2796       GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2797           slot, sid);
2798       to_activate = g_list_append (to_activate, slot);
2799     } else {
2800       GST_DEBUG_OBJECT (dbin,
2801           "Stream '%s' from slot %p is already active on output %p", sid, slot,
2802           slot->output);
2803       future_request_streams =
2804           g_list_append (future_request_streams, (gchar *) sid);
2805     }
2806   }
2807
2808   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2809     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2810     /* For slots that have an output, check if it's part of the streams to
2811      * be active */
2812     if (slot->output) {
2813       gboolean slot_to_deactivate = TRUE;
2814
2815       if (slot->active_stream) {
2816         if (stream_in_list (select_streams,
2817                 gst_stream_get_stream_id (slot->active_stream)))
2818           slot_to_deactivate = FALSE;
2819       }
2820       if (slot_to_deactivate && slot->pending_stream
2821           && slot->pending_stream != slot->active_stream) {
2822         if (stream_in_list (select_streams,
2823                 gst_stream_get_stream_id (slot->pending_stream)))
2824           slot_to_deactivate = FALSE;
2825       }
2826       if (slot_to_deactivate) {
2827         GST_DEBUG_OBJECT (dbin,
2828             "Slot %p (%s) should be deactivated, no longer used", slot,
2829             slot->
2830             active_stream ? gst_stream_get_stream_id (slot->active_stream) :
2831             "NULL");
2832         to_deactivate = g_list_append (to_deactivate, slot);
2833       }
2834     }
2835   }
2836
2837   if (to_deactivate != NULL) {
2838     GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2839     /* We need to compare what needs to be activated and deactivated in order
2840      * to determine whether there are outputs that can be transferred */
2841     /* Take the stream-id of the slots that are to be activated, for which there
2842      * is a slot of the same type that needs to be deactivated */
2843     tmp = to_deactivate;
2844     while (tmp) {
2845       MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2846       gboolean removeit = FALSE;
2847       GList *tmp2, *next;
2848       GST_DEBUG_OBJECT (dbin,
2849           "Checking if slot to deactivate (%p) has a candidate slot to activate",
2850           slot_to_deactivate);
2851       for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2852         MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2853         GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2854         if (slot_to_activate->type == slot_to_deactivate->type) {
2855           GST_DEBUG_OBJECT (dbin, "Re-using");
2856           to_reassign = g_list_append (to_reassign, (gchar *)
2857               gst_stream_get_stream_id (slot_to_activate->active_stream));
2858           slots_to_reassign =
2859               g_list_append (slots_to_reassign, slot_to_deactivate);
2860           to_activate = g_list_remove (to_activate, slot_to_activate);
2861           removeit = TRUE;
2862           break;
2863         }
2864       }
2865       next = tmp->next;
2866       if (removeit)
2867         to_deactivate = g_list_delete_link (to_deactivate, tmp);
2868       tmp = next;
2869     }
2870   }
2871
2872   for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2873     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2874     GST_DEBUG_OBJECT (dbin,
2875         "Really need to deactivate slot %p, but no available alternative",
2876         slot);
2877
2878     slots_to_reassign = g_list_append (slots_to_reassign, slot);
2879   }
2880
2881   /* The only slots left to activate are the ones that won't be reassigned and
2882    * therefore really need to have a new output created */
2883   for (tmp = to_activate; tmp; tmp = tmp->next) {
2884     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2885     if (slot->active_stream)
2886       future_request_streams =
2887           g_list_append (future_request_streams,
2888           (gchar *) gst_stream_get_stream_id (slot->active_stream));
2889     else if (slot->pending_stream)
2890       future_request_streams =
2891           g_list_append (future_request_streams,
2892           (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2893     else
2894       GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2895   }
2896
2897   if (to_activate == NULL && pending_streams != NULL) {
2898     GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2899     if (dbin->requested_selection)
2900       g_list_free_full (dbin->requested_selection, g_free);
2901     dbin->requested_selection =
2902         g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
2903     g_list_free (to_deactivate);
2904     g_list_free (pending_streams);
2905     to_deactivate = NULL;
2906     pending_streams = NULL;
2907   } else {
2908     if (dbin->requested_selection)
2909       g_list_free_full (dbin->requested_selection, g_free);
2910     dbin->requested_selection =
2911         g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
2912     dbin->requested_selection =
2913         g_list_concat (dbin->requested_selection,
2914         g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
2915     if (dbin->to_activate)
2916       g_list_free (dbin->to_activate);
2917     dbin->to_activate = g_list_copy (to_reassign);
2918   }
2919
2920   dbin->selection_updated = TRUE;
2921   SELECTION_UNLOCK (dbin);
2922
2923   if (unknown) {
2924     GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2925     g_list_free (unknown);
2926   }
2927
2928   if (to_activate && !slots_to_reassign) {
2929     for (tmp = to_activate; tmp; tmp = tmp->next) {
2930       MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2931       gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2932           (GstPadProbeCallback) idle_reconfigure, slot, NULL);
2933     }
2934   }
2935
2936   /* For all streams to deactivate, add an idle probe where we will do
2937    * the unassignment and switch over */
2938   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2939     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2940     gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2941         (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2942   }
2943
2944   if (to_deactivate)
2945     g_list_free (to_deactivate);
2946   if (to_activate)
2947     g_list_free (to_activate);
2948   if (to_reassign)
2949     g_list_free (to_reassign);
2950   if (future_request_streams)
2951     g_list_free (future_request_streams);
2952   if (pending_streams)
2953     g_list_free (pending_streams);
2954   if (slots_to_reassign)
2955     g_list_free (slots_to_reassign);
2956
2957   return ret;
2958 }
2959
2960 static GstPadProbeReturn
2961 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2962     DecodebinOutputStream * output)
2963 {
2964   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2965   GstDecodebin3 *dbin = output->dbin;
2966   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2967
2968   GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2969
2970   switch (GST_EVENT_TYPE (event)) {
2971     case GST_EVENT_SELECT_STREAMS:
2972     {
2973       GstPad *peer;
2974       GList *streams = NULL;
2975       guint32 seqnum = gst_event_get_seqnum (event);
2976
2977       if (dbin->upstream_selected) {
2978         GST_DEBUG_OBJECT (pad, "Letting select-streams event flow upstream");
2979         break;
2980       }
2981
2982       SELECTION_LOCK (dbin);
2983       if (seqnum == dbin->select_streams_seqnum) {
2984         SELECTION_UNLOCK (dbin);
2985         GST_DEBUG_OBJECT (pad,
2986             "Already handled/handling that SELECT_STREAMS event");
2987         gst_event_unref (event);
2988         ret = GST_PAD_PROBE_HANDLED;
2989         break;
2990       }
2991       dbin->select_streams_seqnum = seqnum;
2992       if (dbin->pending_select_streams != NULL) {
2993         GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2994         g_list_free (dbin->pending_select_streams);
2995         dbin->pending_select_streams = NULL;
2996       }
2997       gst_event_parse_select_streams (event, &streams);
2998       dbin->pending_select_streams = g_list_copy (streams);
2999       SELECTION_UNLOCK (dbin);
3000
3001       /* Send event upstream */
3002       if ((peer = gst_pad_get_peer (pad))) {
3003         gst_pad_send_event (peer, event);
3004         gst_object_unref (peer);
3005       } else {
3006         gst_event_unref (event);
3007       }
3008       /* Finally handle the switch */
3009       if (streams) {
3010         handle_stream_switch (dbin, streams, seqnum);
3011         g_list_free_full (streams, g_free);
3012       }
3013       ret = GST_PAD_PROBE_HANDLED;
3014     }
3015       break;
3016     default:
3017       break;
3018   }
3019
3020   return ret;
3021 }
3022
3023 static gboolean
3024 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
3025 {
3026   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3027
3028   GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
3029   if (!dbin->upstream_selected
3030       && GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
3031     GList *streams = NULL;
3032     guint32 seqnum = gst_event_get_seqnum (event);
3033
3034     SELECTION_LOCK (dbin);
3035     if (seqnum == dbin->select_streams_seqnum) {
3036       SELECTION_UNLOCK (dbin);
3037       GST_DEBUG_OBJECT (dbin,
3038           "Already handled/handling that SELECT_STREAMS event");
3039       return TRUE;
3040     }
3041     dbin->select_streams_seqnum = seqnum;
3042     if (dbin->pending_select_streams != NULL) {
3043       GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3044       g_list_free (dbin->pending_select_streams);
3045       dbin->pending_select_streams = NULL;
3046     }
3047     gst_event_parse_select_streams (event, &streams);
3048     dbin->pending_select_streams = g_list_copy (streams);
3049     SELECTION_UNLOCK (dbin);
3050
3051     /* FIXME : We don't have an upstream ?? */
3052 #if 0
3053     /* Send event upstream */
3054     if ((peer = gst_pad_get_peer (pad))) {
3055       gst_pad_send_event (peer, event);
3056       gst_object_unref (peer);
3057     }
3058 #endif
3059     /* Finally handle the switch */
3060     if (streams) {
3061       handle_stream_switch (dbin, streams, seqnum);
3062       g_list_free_full (streams, g_free);
3063     }
3064
3065     gst_event_unref (event);
3066     return TRUE;
3067   }
3068   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
3069 }
3070
3071
3072 static void
3073 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3074 {
3075   if (slot->probe_id)
3076     gst_pad_remove_probe (slot->src_pad, slot->probe_id);
3077   if (slot->input) {
3078     if (slot->input->srcpad)
3079       gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
3080   }
3081
3082   gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
3083   gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
3084   gst_object_replace ((GstObject **) & slot->src_pad, NULL);
3085   gst_object_replace ((GstObject **) & slot->active_stream, NULL);
3086   g_free (slot);
3087 }
3088
3089 static void
3090 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3091 {
3092   GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
3093   gst_element_call_async (GST_ELEMENT_CAST (dbin),
3094       (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
3095 }
3096
3097 /* Create a DecodebinOutputStream for a given type
3098  * Note: It will be empty initially, it needs to be configured
3099  * afterwards */
3100 static DecodebinOutputStream *
3101 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
3102 {
3103   DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
3104   gchar *pad_name;
3105   const gchar *prefix;
3106   GstStaticPadTemplate *templ;
3107   GstPadTemplate *ptmpl;
3108   guint32 *counter;
3109   GstPad *internal_pad;
3110
3111   GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
3112       res, gst_stream_type_get_name (type));
3113
3114   res->type = type;
3115   res->dbin = dbin;
3116   res->decoder_latency = GST_CLOCK_TIME_NONE;
3117
3118   if (type & GST_STREAM_TYPE_VIDEO) {
3119     templ = &video_src_template;
3120     counter = &dbin->vpadcount;
3121     prefix = "video";
3122   } else if (type & GST_STREAM_TYPE_AUDIO) {
3123     templ = &audio_src_template;
3124     counter = &dbin->apadcount;
3125     prefix = "audio";
3126   } else if (type & GST_STREAM_TYPE_TEXT) {
3127     templ = &text_src_template;
3128     counter = &dbin->tpadcount;
3129     prefix = "text";
3130   } else {
3131     templ = &src_template;
3132     counter = &dbin->opadcount;
3133     prefix = "src";
3134   }
3135
3136   pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
3137   *counter += 1;
3138   ptmpl = gst_static_pad_template_get (templ);
3139   res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
3140   gst_object_unref (ptmpl);
3141   g_free (pad_name);
3142   gst_pad_set_active (res->src_pad, TRUE);
3143   /* Put an event probe on the internal proxy pad to detect upstream
3144    * events */
3145   internal_pad =
3146       (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
3147   gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
3148       (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
3149   gst_object_unref (internal_pad);
3150
3151   dbin->output_streams = g_list_append (dbin->output_streams, res);
3152
3153   return res;
3154 }
3155
3156 static void
3157 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
3158 {
3159   if (output->slot) {
3160     if (output->decoder_sink && output->decoder)
3161       gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
3162
3163     output->slot->output = NULL;
3164     output->slot = NULL;
3165   }
3166   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
3167   gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
3168   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
3169   if (output->src_exposed) {
3170     gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
3171   }
3172   if (output->decoder) {
3173     gst_element_set_locked_state (output->decoder, TRUE);
3174     gst_element_set_state (output->decoder, GST_STATE_NULL);
3175     gst_bin_remove ((GstBin *) dbin, output->decoder);
3176   }
3177   g_free (output);
3178 }
3179
3180 static GstStateChangeReturn
3181 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
3182 {
3183   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3184   GstStateChangeReturn ret;
3185
3186   /* Upwards */
3187   switch (transition) {
3188     default:
3189       break;
3190   }
3191   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3192   if (ret == GST_STATE_CHANGE_FAILURE)
3193     goto beach;
3194
3195   switch (transition) {
3196     case GST_STATE_CHANGE_PAUSED_TO_READY:
3197     {
3198       GList *tmp;
3199
3200       /* Free output streams */
3201       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
3202         DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
3203         free_output_stream (dbin, output);
3204       }
3205       g_list_free (dbin->output_streams);
3206       dbin->output_streams = NULL;
3207       /* Free multiqueue slots */
3208       for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3209         MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3210         free_multiqueue_slot (dbin, slot);
3211       }
3212       g_list_free (dbin->slots);
3213       dbin->slots = NULL;
3214       dbin->current_group_id = GST_GROUP_ID_INVALID;
3215       /* Free inputs */
3216       /* Reset the main input group id since it will get a new id on a new stream */
3217       dbin->main_input->group_id = GST_GROUP_ID_INVALID;
3218       /* Reset multiqueue to default interleave */
3219       g_object_set (dbin->multiqueue, "min-interleave-time",
3220           dbin->default_mq_min_interleave, NULL);
3221       dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
3222       dbin->upstream_selected = FALSE;
3223       g_list_free_full (dbin->active_selection, g_free);
3224       dbin->active_selection = NULL;
3225     }
3226       break;
3227     default:
3228       break;
3229   }
3230 beach:
3231   return ret;
3232 }