05db8740cd3aede571e2028a6282f8e05b5ee875
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst / playback / gstdecodebin3.c
1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32
33 #include "gstplaybackelements.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36
37 /**
38  * SECTION:element-decodebin3
39  * @title: decodebin3
40  *
41  * #GstBin that auto-magically constructs a decoding pipeline using available
42  * decoders and demuxers via auto-plugging. The output is raw audio, video
43  * or subtitle streams.
44  *
45  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46  *
47  * * supports publication and selection of stream information via
48  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAMS events.
49  *
50  * * dynamically switches stream connections internally, and
51  * reuses decoder elements when stream selections change, so that in
52  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53  * and only creates new elements when streams change and an existing decoder
54  * is not capable of handling the new format.
55  *
56  * * supports multiple input pads for the parallel decoding of auxiliary streams
57  * not muxed with the primary stream.
58  *
59  * * does not handle network stream buffering. decodebin3 expects that network stream
60  * buffering is handled upstream, before data is passed to it.
61  *
62  * > decodebin3 is still experimental API and a technology preview.
63  * > Its behaviour and exposed API is subject to change.
64  *
65  */
66
67 /*
68  * Global design
69  *
70  * 1) From sink pad to elementary streams (GstParseBin)
71  *
72  * The input sink pads are fed to GstParseBin. GstParseBin will feed them
73  * through typefind. When the caps are detected (or changed) we recursively
74  * figure out which demuxer, parser or depayloader is needed until we get to
75  * elementary streams.
76  *
77  * All elementary streams (whether decoded or not, whether exposed or not) are
78  * fed through multiqueue. There is only *one* multiqueue in decodebin3.
79  *
80  * => MultiQueue is the cornerstone.
81  * => No buffering before multiqueue
82  *
83  * 2) Elementary streams
84  *
85  * After GstParseBin, there are 3 main components:
86  *  1) Input Streams (provided by GstParseBin)
87  *  2) Multiqueue slots
88  *  3) Output Streams
89  *
90  * Input Streams correspond to the stream coming from GstParseBin and that gets
91  * fed into a multiqueue slot.
92  *
93  * Output Streams correspond to the combination of a (optional) decoder and an
94  * output ghostpad. Output Streams can be moved from one multiqueue slot to
95  * another, can reconfigure itself (different decoders), and can be
96  * added/removed depending on the configuration (all streams outputted, only one
97  * of each type, ...).
98  *
99  * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
100  * each 'active' Input Stream there is a corresponding slot.
101  * Slots might have different streams on input and output (due to internal
102  * buffering).
103  *
104  * Due to internal queuing/buffering/..., all those components (might) behave
105  * asynchronously. Therefore probes will be used on each component source pad to
106  * detect various key-points:
107  *  * EOS :
108  *     the stream is done => Mark that component as done, optionally freeing/removing it
109  *  * STREAM_START :
110  *     a new stream is starting => link it further if needed
111  *
112  * 3) Gradual replacement
113  *
114  * If the caps change at any point in decodebin (input sink pad, demuxer output,
115  * multiqueue output, ..), we gradually replace (if needed) the following elements.
116  *
117  * This is handled by the probes in various locations:
118  *  a) typefind output
119  *  b) multiqueue input (source pad of Input Streams)
120  *  c) multiqueue output (source pad of Multiqueue Slots)
121  *  d) final output (target of source ghostpads)
122  *
123  * When CAPS event arrive at those points, one of three things can happen:
124  * a) There is no elements downstream yet, just create/link-to following elements
125  * b) There are downstream elements, do a ACCEPT_CAPS query
126  *  b.1) The new CAPS are accepted, keep current configuration
127  *  b.2) The new CAPS are not accepted, remove following elements then do a)
128  *
129  *    Components:
130  *
131  *                                                   MultiQ     Output
132  *                     Input(s)                      Slots      Streams
133  *  /-------------------------------------------\   /-----\  /------------- \
134  *
135  * +-------------------------------------------------------------------------+
136  * |                                                                         |
137  * | +---------------------------------------------+                         |
138  * | |   GstParseBin(s)                            |                         |
139  * | |                +--------------+             |  +-----+                |
140  * | |                |              |---[parser]-[|--| Mul |---[ decoder ]-[|
141  * |]--[ typefind ]---|  demuxer(s)  |------------[|  | ti  |                |
142  * | |                |  (if needed) |---[parser]-[|--| qu  |                |
143  * | |                |              |---[parser]-[|--| eu  |---[ decoder ]-[|
144  * | |                +--------------+             |  +------             ^  |
145  * | +---------------------------------------------+        ^             |  |
146  * |                                               ^        |             |  |
147  * +-----------------------------------------------+--------+-------------+--+
148  *                                                 |        |             |
149  *                                                 |        |             |
150  *                                       Probes  --/--------/-------------/
151  *
152  * ATOMIC SWITCHING
153  *
154  * We want to ensure we re-use decoders when switching streams. This takes place
155  * at the multiqueue output level.
156  *
157  * MAIN CONCEPTS
158  *  1) Activating a stream (i.e. linking a slot to an output) is only done within
159  *    the streaming thread in the multiqueue_src_probe() and only if the
160       stream is in the REQUESTED selection.
161  *  2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
162  *    within the stream thread, but only in a purposefully called IDLE probe
163  *    that calls reassign_slot().
164  *
165  * Based on those two principles, 3 "selection" of streams (stream-id) are used:
166  * 1) requested_selection
167  *    All streams within that list should be activated
168  * 2) active_selection
169  *    List of streams that are exposed by decodebin
170  * 3) to_activate
171  *    List of streams that will be moved to requested_selection in the
172  *    reassign_slot() method (i.e. once a stream was deactivated, and the output
173  *    was retargetted)
174  */
175
176
177 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
178 #define GST_CAT_DEFAULT decodebin3_debug
179
180 #define GST_TYPE_DECODEBIN3      (gst_decodebin3_get_type ())
181
182 #define EXTRA_DEBUG 1
183
184 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
185 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
186 static GQuark
187 _custom_final_eos_quark_get (void)
188 {
189   static gsize g_quark;
190
191   if (g_once_init_enter (&g_quark)) {
192     gsize quark =
193         (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
194     g_once_init_leave (&g_quark, quark);
195   }
196   return g_quark;
197 }
198
199 typedef struct _GstDecodebin3 GstDecodebin3;
200 typedef struct _GstDecodebin3Class GstDecodebin3Class;
201
202 typedef struct _DecodebinInputStream DecodebinInputStream;
203 typedef struct _DecodebinInput DecodebinInput;
204 typedef struct _DecodebinOutputStream DecodebinOutputStream;
205
206 struct _GstDecodebin3
207 {
208   GstBin bin;
209
210   /* input_lock protects the following variables */
211   GMutex input_lock;
212   /* Main input (static sink pad) */
213   DecodebinInput *main_input;
214   /* Supplementary input (request sink pads) */
215   GList *other_inputs;
216   /* counter for input */
217   guint32 input_counter;
218   /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
219   /* FIXME : Needs to be reset appropriately (when upstream changes ?) */
220   guint32 current_group_id;
221   /* End of variables protected by input_lock */
222
223   GstElement *multiqueue;
224   GstClockTime default_mq_min_interleave;
225   GstClockTime current_mq_min_interleave;
226
227   /* selection_lock protects access to following variables */
228   GMutex selection_lock;
229   GList *input_streams;         /* List of DecodebinInputStream for active collection */
230   GList *output_streams;        /* List of DecodebinOutputStream used for output */
231   GList *slots;                 /* List of MultiQueueSlot */
232   guint slot_id;
233
234   /* Active collection */
235   GstStreamCollection *collection;
236   /* requested selection of stream-id to activate post-multiqueue */
237   GList *requested_selection;
238   /* list of stream-id currently activated in output */
239   GList *active_selection;
240   /* List of stream-id that need to be activated (after a stream switch for ex) */
241   GList *to_activate;
242   /* Pending select streams event */
243   guint32 select_streams_seqnum;
244   /* pending list of streams to select (from downstream) */
245   GList *pending_select_streams;
246   /* TRUE if requested_selection was updated, will become FALSE once
247    * it has fully transitioned to active */
248   gboolean selection_updated;
249   /* End of variables protected by selection_lock */
250   gboolean upstream_selected;
251
252   /* List of pending collections.
253    * FIXME : Is this really needed ? */
254   GList *pending_collection;
255
256   /* Factories */
257   GMutex factories_lock;
258   guint32 factories_cookie;
259   /* All DECODABLE factories */
260   GList *factories;
261   /* Only DECODER factories */
262   GList *decoder_factories;
263   /* DECODABLE but not DECODER factories */
264   GList *decodable_factories;
265
266   /* counters for pads */
267   guint32 apadcount, vpadcount, tpadcount, opadcount;
268
269   /* Properties */
270   GstCaps *caps;
271 };
272
273 struct _GstDecodebin3Class
274 {
275   GstBinClass class;
276
277     gint (*select_stream) (GstDecodebin3 * dbin,
278       GstStreamCollection * collection, GstStream * stream);
279 };
280
281 /* Input of decodebin, controls input pad and parsebin */
282 struct _DecodebinInput
283 {
284   GstDecodebin3 *dbin;
285
286   gboolean is_main;
287
288   GstPad *ghost_sink;
289   GstPad *parsebin_sink;
290
291   GstStreamCollection *collection;      /* Active collection */
292   gboolean upstream_selected;
293
294   guint group_id;
295
296   GstElement *parsebin;
297
298   gulong pad_added_sigid;
299   gulong pad_removed_sigid;
300   gulong drained_sigid;
301
302   /* TRUE if the input got drained
303    * FIXME : When do we reset it if re-used ?
304    */
305   gboolean drained;
306
307   /* HACK : Remove these fields */
308   /* List of PendingPad structures */
309   GList *pending_pads;
310 };
311
312 /* Multiqueue Slots */
313 typedef struct _MultiQueueSlot
314 {
315   guint id;
316
317   GstDecodebin3 *dbin;
318   /* Type of stream handled by this slot */
319   GstStreamType type;
320
321   /* Linked input and output */
322   DecodebinInputStream *input;
323
324   /* pending => last stream received on sink pad */
325   GstStream *pending_stream;
326   /* active => last stream outputted on source pad */
327   GstStream *active_stream;
328
329   GstPad *sink_pad, *src_pad;
330
331   /* id of the MQ src_pad event probe */
332   gulong probe_id;
333
334   gboolean is_drained;
335
336   DecodebinOutputStream *output;
337 } MultiQueueSlot;
338
339 /* Streams that are exposed downstream (i.e. output) */
340 struct _DecodebinOutputStream
341 {
342   GstDecodebin3 *dbin;
343   /* The type of stream handled by this output stream */
344   GstStreamType type;
345
346   /* The slot to which this output stream is currently connected to */
347   MultiQueueSlot *slot;
348
349   GstElement *decoder;          /* Optional */
350   GstPad *decoder_sink, *decoder_src;
351   gboolean linked;
352
353   /* ghostpad */
354   GstPad *src_pad;
355   /* Flag if ghost pad is exposed */
356   gboolean src_exposed;
357
358   /* Reported decoder latency */
359   GstClockTime decoder_latency;
360
361   /* keyframe dropping probe */
362   gulong drop_probe_id;
363 };
364
365 /* Pending pads from parsebin */
366 typedef struct _PendingPad
367 {
368   GstDecodebin3 *dbin;
369   DecodebinInput *input;
370   GstPad *pad;
371
372   gulong buffer_probe;
373   gulong event_probe;
374   gboolean saw_eos;
375 } PendingPad;
376
377 /* properties */
378 enum
379 {
380   PROP_0,
381   PROP_CAPS
382 };
383
384 /* signals */
385 enum
386 {
387   SIGNAL_SELECT_STREAM,
388   SIGNAL_ABOUT_TO_FINISH,
389   LAST_SIGNAL
390 };
391 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
392
393 #define SELECTION_LOCK(dbin) G_STMT_START {                             \
394     GST_LOG_OBJECT (dbin,                                               \
395                     "selection locking from thread %p",                 \
396                     g_thread_self ());                                  \
397     g_mutex_lock (&dbin->selection_lock);                               \
398     GST_LOG_OBJECT (dbin,                                               \
399                     "selection locked from thread %p",                  \
400                     g_thread_self ());                                  \
401   } G_STMT_END
402
403 #define SELECTION_UNLOCK(dbin) G_STMT_START {                           \
404     GST_LOG_OBJECT (dbin,                                               \
405                     "selection unlocking from thread %p",               \
406                     g_thread_self ());                                  \
407     g_mutex_unlock (&dbin->selection_lock);                             \
408   } G_STMT_END
409
410 #define INPUT_LOCK(dbin) G_STMT_START {                         \
411     GST_LOG_OBJECT (dbin,                                               \
412                     "input locking from thread %p",                     \
413                     g_thread_self ());                                  \
414     g_mutex_lock (&dbin->input_lock);                           \
415     GST_LOG_OBJECT (dbin,                                               \
416                     "input locked from thread %p",                      \
417                     g_thread_self ());                                  \
418   } G_STMT_END
419
420 #define INPUT_UNLOCK(dbin) G_STMT_START {                               \
421     GST_LOG_OBJECT (dbin,                                               \
422                     "input unlocking from thread %p",           \
423                     g_thread_self ());                                  \
424     g_mutex_unlock (&dbin->input_lock);                         \
425   } G_STMT_END
426
427 GType gst_decodebin3_get_type (void);
428 #define gst_decodebin3_parent_class parent_class
429 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
430 #define _do_init \
431     GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");\
432     playback_element_init (plugin);
433 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (decodebin3, "decodebin3", GST_RANK_NONE,
434     GST_TYPE_DECODEBIN3, _do_init);
435
436 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
437
438 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
439     GST_PAD_SINK,
440     GST_PAD_ALWAYS,
441     GST_STATIC_CAPS_ANY);
442
443 static GstStaticPadTemplate request_sink_template =
444 GST_STATIC_PAD_TEMPLATE ("sink_%u",
445     GST_PAD_SINK,
446     GST_PAD_REQUEST,
447     GST_STATIC_CAPS_ANY);
448
449 static GstStaticPadTemplate video_src_template =
450 GST_STATIC_PAD_TEMPLATE ("video_%u",
451     GST_PAD_SRC,
452     GST_PAD_SOMETIMES,
453     GST_STATIC_CAPS_ANY);
454
455 static GstStaticPadTemplate audio_src_template =
456 GST_STATIC_PAD_TEMPLATE ("audio_%u",
457     GST_PAD_SRC,
458     GST_PAD_SOMETIMES,
459     GST_STATIC_CAPS_ANY);
460
461 static GstStaticPadTemplate text_src_template =
462 GST_STATIC_PAD_TEMPLATE ("text_%u",
463     GST_PAD_SRC,
464     GST_PAD_SOMETIMES,
465     GST_STATIC_CAPS_ANY);
466
467 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
468     GST_PAD_SRC,
469     GST_PAD_SOMETIMES,
470     GST_STATIC_CAPS_ANY);
471
472
473 static void gst_decodebin3_dispose (GObject * object);
474 static void gst_decodebin3_finalize (GObject * object);
475 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
476     const GValue * value, GParamSpec * pspec);
477 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
478     GValue * value, GParamSpec * pspec);
479
480 static gboolean parsebin_autoplug_continue_cb (GstElement *
481     parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
482
483 static gint
484 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
485     GstStreamCollection * collection, GstStream * stream)
486 {
487   GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
488
489   return -1;
490 }
491
492 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
493     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
494 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
495 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
496     GstStateChange transition);
497 static gboolean gst_decodebin3_send_event (GstElement * element,
498     GstEvent * event);
499
500 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
501 #if 0
502 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
503     GstElementFactoryListType ftype);
504 #endif
505
506 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
507 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
508 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
509 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
510
511 static void reconfigure_output_stream (DecodebinOutputStream * output,
512     MultiQueueSlot * slot);
513 static void free_output_stream (GstDecodebin3 * dbin,
514     DecodebinOutputStream * output);
515 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
516     GstStreamType type);
517
518 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
519     GstPadProbeInfo * info, MultiQueueSlot * slot);
520 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
521 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
522     DecodebinInputStream * input);
523 static void link_input_to_slot (DecodebinInputStream * input,
524     MultiQueueSlot * slot);
525 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
526 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
527     MultiQueueSlot * slot);
528
529 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
530 static void update_requested_selection (GstDecodebin3 * dbin);
531
532 /* FIXME: Really make all the parser stuff a self-contained helper object */
533 #include "gstdecodebin3-parse.c"
534
535 static gboolean
536 _gst_int_accumulator (GSignalInvocationHint * ihint,
537     GValue * return_accu, const GValue * handler_return, gpointer dummy)
538 {
539   gint res = g_value_get_int (handler_return);
540
541   g_value_set_int (return_accu, res);
542
543   if (res == -1)
544     return TRUE;
545
546   return FALSE;
547 }
548
549 static void
550 gst_decodebin3_class_init (GstDecodebin3Class * klass)
551 {
552   GObjectClass *gobject_klass = (GObjectClass *) klass;
553   GstElementClass *element_class = (GstElementClass *) klass;
554   GstBinClass *bin_klass = (GstBinClass *) klass;
555
556   gobject_klass->dispose = gst_decodebin3_dispose;
557   gobject_klass->finalize = gst_decodebin3_finalize;
558   gobject_klass->set_property = gst_decodebin3_set_property;
559   gobject_klass->get_property = gst_decodebin3_get_property;
560
561   /* FIXME : ADD PROPERTIES ! */
562   g_object_class_install_property (gobject_klass, PROP_CAPS,
563       g_param_spec_boxed ("caps", "Caps",
564           "The caps on which to stop decoding. (NULL = default)",
565           GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
566
567   /* FIXME : ADD SIGNALS ! */
568   /**
569    * GstDecodebin3::select-stream
570    * @decodebin: a #GstDecodebin3
571    * @collection: a #GstStreamCollection
572    * @stream: a #GstStream
573    *
574    * This signal is emitted whenever @decodebin needs to decide whether
575    * to expose a @stream of a given @collection.
576    *
577    * Note that the prefered way to select streams is to listen to
578    * GST_MESSAGE_STREAM_COLLECTION on the bus and send a
579    * GST_EVENT_SELECT_STREAMS with the streams the user wants.
580    *
581    * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
582    * A value of -1 (default) lets @decodebin decide what to do with the stream.
583    * */
584   gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
585       g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
586       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
587       _gst_int_accumulator, NULL, NULL,
588       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
589
590   /**
591    * GstDecodebin3::about-to-finish:
592    *
593    * This signal is emitted when the data for the selected URI is
594    * entirely buffered and it is safe to specify another URI.
595    */
596   gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
597       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
598       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
599
600
601   element_class->request_new_pad =
602       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
603   element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
604   element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
605
606   gst_element_class_add_pad_template (element_class,
607       gst_static_pad_template_get (&sink_template));
608   gst_element_class_add_pad_template (element_class,
609       gst_static_pad_template_get (&request_sink_template));
610   gst_element_class_add_pad_template (element_class,
611       gst_static_pad_template_get (&video_src_template));
612   gst_element_class_add_pad_template (element_class,
613       gst_static_pad_template_get (&audio_src_template));
614   gst_element_class_add_pad_template (element_class,
615       gst_static_pad_template_get (&text_src_template));
616   gst_element_class_add_pad_template (element_class,
617       gst_static_pad_template_get (&src_template));
618
619   gst_element_class_set_static_metadata (element_class,
620       "Decoder Bin 3", "Generic/Bin/Decoder",
621       "Autoplug and decode to raw media",
622       "Edward Hervey <edward@centricular.com>");
623
624   bin_klass->handle_message = gst_decodebin3_handle_message;
625
626   klass->select_stream = gst_decodebin3_select_stream;
627 }
628
629 static void
630 gst_decodebin3_init (GstDecodebin3 * dbin)
631 {
632   /* Create main input */
633   dbin->main_input = create_new_input (dbin, TRUE);
634
635   dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
636   g_object_get (dbin->multiqueue, "min-interleave-time",
637       &dbin->default_mq_min_interleave, NULL);
638   dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
639   g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
640       "max-size-buffers", 0, "use-interleave", TRUE, NULL);
641   gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
642
643   dbin->current_group_id = GST_GROUP_ID_INVALID;
644
645   g_mutex_init (&dbin->factories_lock);
646   g_mutex_init (&dbin->selection_lock);
647   g_mutex_init (&dbin->input_lock);
648
649   dbin->caps = gst_static_caps_get (&default_raw_caps);
650
651   GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
652 }
653
654 static void
655 gst_decodebin3_dispose (GObject * object)
656 {
657   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
658   GList *walk, *next;
659
660   if (dbin->factories)
661     gst_plugin_feature_list_free (dbin->factories);
662   if (dbin->decoder_factories)
663     g_list_free (dbin->decoder_factories);
664   if (dbin->decodable_factories)
665     g_list_free (dbin->decodable_factories);
666   g_list_free_full (dbin->requested_selection, g_free);
667   g_list_free_full (dbin->active_selection, g_free);
668   g_list_free (dbin->to_activate);
669   g_list_free (dbin->pending_select_streams);
670   g_clear_object (&dbin->collection);
671
672   free_input (dbin, dbin->main_input);
673
674   for (walk = dbin->other_inputs; walk; walk = next) {
675     DecodebinInput *input = walk->data;
676
677     next = g_list_next (walk);
678
679     free_input (dbin, input);
680     dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
681   }
682
683   G_OBJECT_CLASS (parent_class)->dispose (object);
684 }
685
686 static void
687 gst_decodebin3_finalize (GObject * object)
688 {
689   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
690
691   g_mutex_clear (&dbin->factories_lock);
692   g_mutex_clear (&dbin->selection_lock);
693   g_mutex_clear (&dbin->input_lock);
694
695   G_OBJECT_CLASS (parent_class)->finalize (object);
696 }
697
698 static void
699 gst_decodebin3_set_property (GObject * object, guint prop_id,
700     const GValue * value, GParamSpec * pspec)
701 {
702   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
703
704   /* FIXME : IMPLEMENT */
705   switch (prop_id) {
706     case PROP_CAPS:
707       GST_OBJECT_LOCK (dbin);
708       if (dbin->caps)
709         gst_caps_unref (dbin->caps);
710       dbin->caps = g_value_dup_boxed (value);
711       GST_OBJECT_UNLOCK (dbin);
712       break;
713     default:
714       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
715       break;
716   }
717 }
718
719 static void
720 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
721     GParamSpec * pspec)
722 {
723   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
724
725   /* FIXME : IMPLEMENT */
726   switch (prop_id) {
727     case PROP_CAPS:
728       GST_OBJECT_LOCK (dbin);
729       g_value_set_boxed (value, dbin->caps);
730       GST_OBJECT_UNLOCK (dbin);
731       break;
732     default:
733       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
734       break;
735   }
736 }
737
738 static gboolean
739 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
740     GstCaps * caps, GstDecodebin3 * dbin)
741 {
742   GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
743
744   /* If it matches our target caps, expose it */
745   if (gst_caps_can_intersect (caps, dbin->caps))
746     return FALSE;
747
748   return TRUE;
749 }
750
751 /* This method should be called whenever a STREAM_START event
752  * comes out of a given parsebin.
753  * The caller shall replace the group_id if the function returns TRUE */
754 static gboolean
755 set_input_group_id (DecodebinInput * input, guint32 * group_id)
756 {
757   GstDecodebin3 *dbin = input->dbin;
758
759   if (input->group_id != *group_id) {
760     if (input->group_id != GST_GROUP_ID_INVALID)
761       GST_WARNING_OBJECT (dbin,
762           "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
763           ") on input %p ", input->group_id, *group_id, input);
764     input->group_id = *group_id;
765   }
766
767   if (*group_id != dbin->current_group_id) {
768     if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
769       GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
770           *group_id);
771       dbin->current_group_id = *group_id;
772     }
773     *group_id = dbin->current_group_id;
774     return TRUE;
775   }
776
777   return FALSE;
778 }
779
780 static void
781 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
782 {
783   GstDecodebin3 *dbin = input->dbin;
784   gboolean all_drained;
785   GList *tmp;
786
787   GST_INFO_OBJECT (dbin, "input %p drained", input);
788   input->drained = TRUE;
789
790   all_drained = dbin->main_input->drained;
791   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
792     DecodebinInput *data = (DecodebinInput *) tmp->data;
793
794     all_drained &= data->drained;
795   }
796
797   if (all_drained) {
798     GST_INFO_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
799     g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
800         NULL);
801   }
802 }
803
804 /* Call with INPUT_LOCK taken */
805 static gboolean
806 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
807 {
808   gboolean set_state = FALSE;
809
810   if (input->parsebin == NULL) {
811     input->parsebin = gst_element_factory_make ("parsebin", NULL);
812     if (input->parsebin == NULL)
813       goto no_parsebin;
814     input->parsebin = gst_object_ref (input->parsebin);
815     input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
816     input->pad_added_sigid =
817         g_signal_connect (input->parsebin, "pad-added",
818         (GCallback) parsebin_pad_added_cb, input);
819     input->pad_removed_sigid =
820         g_signal_connect (input->parsebin, "pad-removed",
821         (GCallback) parsebin_pad_removed_cb, input);
822     input->drained_sigid =
823         g_signal_connect (input->parsebin, "drained",
824         (GCallback) parsebin_drained_cb, input);
825     g_signal_connect (input->parsebin, "autoplug-continue",
826         (GCallback) parsebin_autoplug_continue_cb, dbin);
827   }
828
829   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
830     /* The state lock is taken so that we ensure we are the one (de)activating
831      * parsebin. We need to do this to ensure any activation taking place in
832      * parsebin (including by elements doing upstream activation) are done
833      * within the same thread. */
834     GST_STATE_LOCK (input->parsebin);
835     gst_bin_add (GST_BIN (dbin), input->parsebin);
836     set_state = TRUE;
837   }
838
839   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
840       input->parsebin_sink);
841
842   if (set_state) {
843     gst_element_sync_state_with_parent (input->parsebin);
844     GST_STATE_UNLOCK (input->parsebin);
845   }
846
847   return TRUE;
848
849   /* ERRORS */
850 no_parsebin:
851   {
852     gst_element_post_message ((GstElement *) dbin,
853         gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
854     return FALSE;
855   }
856 }
857
858 static GstPadLinkReturn
859 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
860 {
861   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
862   GstPadLinkReturn res = GST_PAD_LINK_OK;
863   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
864
865   g_return_val_if_fail (input, GST_PAD_LINK_REFUSED);
866
867   GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
868       ". Creating parsebin if needed", pad);
869
870   INPUT_LOCK (dbin);
871   if (!ensure_input_parsebin (dbin, input))
872     res = GST_PAD_LINK_REFUSED;
873   INPUT_UNLOCK (dbin);
874
875   return res;
876 }
877
878 /* Drop duration query during _input_pad_unlink */
879 static GstPadProbeReturn
880 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
881     DecodebinInput * input)
882 {
883   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
884
885   if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
886     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
887     if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
888       GST_LOG_OBJECT (pad, "stop forwarding query duration");
889       ret = GST_PAD_PROBE_HANDLED;
890     }
891   }
892
893   return ret;
894 }
895
896 static void
897 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
898 {
899   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
900   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
901
902   g_return_if_fail (input);
903
904   GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
905       ". Removing parsebin.", pad);
906
907   INPUT_LOCK (dbin);
908   if (input->parsebin == NULL) {
909     INPUT_UNLOCK (dbin);
910     return;
911   }
912
913   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
914     GstStreamCollection *collection = NULL;
915     gulong probe_id = gst_pad_add_probe (input->parsebin_sink,
916         GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
917         (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
918
919     /* Clear stream-collection corresponding to current INPUT and post new
920      * stream-collection message, if needed */
921     if (input->collection) {
922       gst_object_unref (input->collection);
923       input->collection = NULL;
924     }
925
926     SELECTION_LOCK (dbin);
927     collection = get_merged_collection (dbin);
928     if (collection && collection != dbin->collection) {
929       GstMessage *msg;
930       GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
931
932       if (dbin->collection)
933         gst_object_unref (dbin->collection);
934       dbin->collection = collection;
935       dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
936
937       msg =
938           gst_message_new_stream_collection ((GstObject *) dbin,
939           dbin->collection);
940
941       SELECTION_UNLOCK (dbin);
942       gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
943       update_requested_selection (dbin);
944     } else {
945       if (collection)
946         gst_object_unref (collection);
947       SELECTION_UNLOCK (dbin);
948     }
949
950     gst_bin_remove (GST_BIN (dbin), input->parsebin);
951     gst_element_set_state (input->parsebin, GST_STATE_NULL);
952     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
953     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
954     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
955     gst_pad_remove_probe (input->parsebin_sink, probe_id);
956     gst_object_unref (input->parsebin);
957     gst_object_unref (input->parsebin_sink);
958
959     input->parsebin = NULL;
960     input->parsebin_sink = NULL;
961
962     if (!input->is_main) {
963       dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
964       free_input_async (dbin, input);
965     }
966   }
967   INPUT_UNLOCK (dbin);
968   return;
969 }
970
971 static void
972 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
973 {
974   GST_DEBUG ("Freeing input %p", input);
975   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
976   gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
977   if (input->parsebin) {
978     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
979     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
980     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
981     gst_element_set_state (input->parsebin, GST_STATE_NULL);
982     gst_object_unref (input->parsebin);
983     gst_object_unref (input->parsebin_sink);
984   }
985   if (input->collection)
986     gst_object_unref (input->collection);
987   g_free (input);
988 }
989
990 static void
991 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
992 {
993   GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
994   gst_element_call_async (GST_ELEMENT_CAST (dbin),
995       (GstElementCallAsyncFunc) free_input, input, NULL);
996 }
997
998 static gboolean
999 sink_query_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstQuery * query)
1000 {
1001   DecodebinInput *input =
1002       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1003
1004   g_return_val_if_fail (input, FALSE);
1005
1006   GST_DEBUG_OBJECT (sinkpad, "query %" GST_PTR_FORMAT, query);
1007
1008   /* We accept any caps, since we will reconfigure ourself internally if the new
1009    * stream is incompatible */
1010   if (GST_QUERY_TYPE (query) == GST_QUERY_ACCEPT_CAPS) {
1011     GST_DEBUG_OBJECT (dbin, "Accepting ACCEPT_CAPS query");
1012     gst_query_set_accept_caps_result (query, TRUE);
1013     return TRUE;
1014   }
1015   return gst_pad_query_default (sinkpad, GST_OBJECT (dbin), query);
1016 }
1017
1018 static gboolean
1019 sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
1020 {
1021   DecodebinInput *input =
1022       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1023
1024   g_return_val_if_fail (input, FALSE);
1025
1026   GST_DEBUG_OBJECT (sinkpad, "event %" GST_PTR_FORMAT, event);
1027
1028   switch (GST_EVENT_TYPE (event)) {
1029     case GST_EVENT_STREAM_START:
1030     {
1031       GstQuery *q = gst_query_new_selectable ();
1032
1033       /* Query whether upstream can handle stream selection or not */
1034       if (gst_pad_peer_query (sinkpad, q)) {
1035         gst_query_parse_selectable (q, &input->upstream_selected);
1036         GST_DEBUG_OBJECT (sinkpad, "Upstream is selectable : %d",
1037             input->upstream_selected);
1038       } else {
1039         input->upstream_selected = FALSE;
1040         GST_DEBUG_OBJECT (sinkpad, "Upstream does not handle SELECTABLE query");
1041       }
1042       gst_query_unref (q);
1043
1044       /* FIXME : We force `decodebin3` to upstream selection mode if *any* of the
1045          inputs is. This means things might break if there's a mix */
1046       if (input->upstream_selected)
1047         dbin->upstream_selected = TRUE;
1048       break;
1049     }
1050     case GST_EVENT_CAPS:
1051     {
1052       GST_DEBUG_OBJECT (sinkpad,
1053           "New caps, checking if they are compatible with existing parsebin");
1054       if (input->parsebin_sink) {
1055         GstCaps *newcaps = NULL;
1056
1057         gst_event_parse_caps (event, &newcaps);
1058         GST_DEBUG_OBJECT (sinkpad, "new caps %" GST_PTR_FORMAT, newcaps);
1059
1060         if (newcaps
1061             && !gst_pad_query_accept_caps (input->parsebin_sink, newcaps)) {
1062           GST_DEBUG_OBJECT (sinkpad,
1063               "Parsebin doesn't accept the new caps %" GST_PTR_FORMAT, newcaps);
1064           GST_STATE_LOCK (dbin);
1065           /* Reset parsebin so that it reconfigures itself for the new stream format */
1066           gst_element_set_state (input->parsebin, GST_STATE_NULL);
1067           gst_element_sync_state_with_parent (input->parsebin);
1068           GST_STATE_UNLOCK (dbin);
1069         } else {
1070           GST_DEBUG_OBJECT (sinkpad, "Parsebin accepts new caps");
1071         }
1072       }
1073       break;
1074     }
1075     default:
1076       break;
1077   }
1078
1079   /* Chain to parent function */
1080   return gst_pad_event_default (sinkpad, GST_OBJECT (dbin), event);
1081 }
1082
1083 /* Call with INPUT_LOCK taken */
1084 static DecodebinInput *
1085 create_new_input (GstDecodebin3 * dbin, gboolean main)
1086 {
1087   DecodebinInput *input;
1088
1089   input = g_new0 (DecodebinInput, 1);
1090   input->dbin = dbin;
1091   input->is_main = main;
1092   input->group_id = GST_GROUP_ID_INVALID;
1093   if (main)
1094     input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
1095   else {
1096     gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
1097     input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
1098     g_free (pad_name);
1099   }
1100   input->upstream_selected = FALSE;
1101   g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
1102   gst_pad_set_event_function (input->ghost_sink,
1103       (GstPadEventFunction) sink_event_function);
1104   gst_pad_set_query_function (input->ghost_sink,
1105       (GstPadQueryFunction) sink_query_function);
1106   gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
1107   gst_pad_set_unlink_function (input->ghost_sink,
1108       gst_decodebin3_input_pad_unlink);
1109
1110   gst_pad_set_active (input->ghost_sink, TRUE);
1111   gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
1112
1113   return input;
1114
1115 }
1116
1117 static GstPad *
1118 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
1119     const gchar * name, const GstCaps * caps)
1120 {
1121   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1122   DecodebinInput *input;
1123   GstPad *res = NULL;
1124
1125   /* We are ignoring names for the time being, not sure it makes any sense
1126    * within the context of decodebin3 ... */
1127   input = create_new_input (dbin, FALSE);
1128   if (input) {
1129     INPUT_LOCK (dbin);
1130     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1131     res = input->ghost_sink;
1132     INPUT_UNLOCK (dbin);
1133   }
1134
1135   return res;
1136 }
1137
1138 /* Must be called with factories lock! */
1139 static void
1140 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1141 {
1142   guint cookie;
1143
1144   cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1145   if (!dbin->factories || dbin->factories_cookie != cookie) {
1146     GList *tmp;
1147     if (dbin->factories)
1148       gst_plugin_feature_list_free (dbin->factories);
1149     if (dbin->decoder_factories)
1150       g_list_free (dbin->decoder_factories);
1151     if (dbin->decodable_factories)
1152       g_list_free (dbin->decodable_factories);
1153     dbin->factories =
1154         gst_element_factory_list_get_elements
1155         (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1156     dbin->factories =
1157         g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1158     dbin->factories_cookie = cookie;
1159
1160     /* Filter decoder and other decodables */
1161     dbin->decoder_factories = NULL;
1162     dbin->decodable_factories = NULL;
1163     for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1164       GstElementFactory *fact = (GstElementFactory *) tmp->data;
1165       if (gst_element_factory_list_is_type (fact,
1166               GST_ELEMENT_FACTORY_TYPE_DECODER))
1167         dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1168       else
1169         dbin->decodable_factories =
1170             g_list_append (dbin->decodable_factories, fact);
1171     }
1172   }
1173 }
1174
1175 /* Must be called with appropriate lock if list is a protected variable */
1176 static const gchar *
1177 stream_in_list (GList * list, const gchar * sid)
1178 {
1179   GList *tmp;
1180
1181 #if EXTRA_DEBUG
1182   for (tmp = list; tmp; tmp = tmp->next) {
1183     gchar *osid = (gchar *) tmp->data;
1184     GST_DEBUG ("Checking %s against %s", sid, osid);
1185   }
1186 #endif
1187
1188   for (tmp = list; tmp; tmp = tmp->next) {
1189     const gchar *osid = (gchar *) tmp->data;
1190     if (!g_strcmp0 (sid, osid))
1191       return osid;
1192   }
1193
1194   return NULL;
1195 }
1196
1197 static gboolean
1198 stream_list_equal (GList * lista, GList * listb)
1199 {
1200   GList *tmp;
1201
1202   if (g_list_length (lista) != g_list_length (listb))
1203     return FALSE;
1204
1205   for (tmp = lista; tmp; tmp = tmp->next) {
1206     gchar *osid = tmp->data;
1207     if (!stream_in_list (listb, osid))
1208       return FALSE;
1209   }
1210
1211   return TRUE;
1212 }
1213
1214 static void
1215 update_requested_selection (GstDecodebin3 * dbin)
1216 {
1217   guint i, nb;
1218   GList *tmp = NULL;
1219   gboolean all_user_selected = TRUE;
1220   GstStreamType used_types = 0;
1221   GstStreamCollection *collection;
1222
1223   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1224    *  the switch handler will take care of the pending selection */
1225   SELECTION_LOCK (dbin);
1226   if (dbin->pending_select_streams) {
1227     GST_DEBUG_OBJECT (dbin,
1228         "No need to create pending selection, SELECT_STREAMS underway");
1229     goto beach;
1230   }
1231
1232   collection = dbin->collection;
1233   if (G_UNLIKELY (collection == NULL)) {
1234     GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1235     goto beach;
1236   }
1237   nb = gst_stream_collection_get_size (collection);
1238
1239   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1240   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1241
1242   /* 3. If not, check if we already have some of the streams in the
1243    * existing active/requested selection */
1244   for (i = 0; i < nb; i++) {
1245     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1246     const gchar *sid = gst_stream_get_stream_id (stream);
1247     gint request = -1;
1248     /* Fire select-stream signal to see if outside components want to
1249      * hint at which streams should be selected */
1250     g_signal_emit (G_OBJECT (dbin),
1251         gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1252         &request);
1253     GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1254
1255     if (request == -1)
1256       all_user_selected = FALSE;
1257     if (request == 1 || (request == -1
1258             && (stream_in_list (dbin->requested_selection, sid)
1259                 || stream_in_list (dbin->active_selection, sid)))) {
1260       GstStreamType curtype = gst_stream_get_stream_type (stream);
1261       if (request == 1)
1262         GST_DEBUG_OBJECT (dbin,
1263             "Using stream requested by 'select-stream' signal : %s", sid);
1264       else
1265         GST_DEBUG_OBJECT (dbin,
1266             "Re-using stream already present in requested or active selection : %s",
1267             sid);
1268       tmp = g_list_append (tmp, (gchar *) sid);
1269       used_types |= curtype;
1270     }
1271   }
1272
1273   /* 4. If the user didn't explicitly selected all streams, match one stream of each type */
1274   if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) {
1275     for (i = 0; i < nb; i++) {
1276       GstStream *stream = gst_stream_collection_get_stream (collection, i);
1277       GstStreamType curtype = gst_stream_get_stream_type (stream);
1278       if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) {
1279         const gchar *sid = gst_stream_get_stream_id (stream);
1280         GST_DEBUG_OBJECT (dbin,
1281             "Automatically selecting stream '%s' of type %s", sid,
1282             gst_stream_type_get_name (curtype));
1283         tmp = g_list_append (tmp, (gchar *) sid);
1284         used_types |= curtype;
1285       }
1286     }
1287   }
1288
1289 beach:
1290   if (stream_list_equal (tmp, dbin->requested_selection)) {
1291     /* If the selection is equal, there is nothign to do */
1292     GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection");
1293     g_list_free (tmp);
1294     tmp = NULL;
1295   }
1296
1297   if (tmp) {
1298     /* Finally set the requested selection */
1299     if (dbin->requested_selection) {
1300       GST_FIXME_OBJECT (dbin,
1301           "Replacing non-NULL requested_selection, what should we do ??");
1302       g_list_free_full (dbin->requested_selection, g_free);
1303     }
1304     dbin->requested_selection =
1305         g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1306     dbin->selection_updated = TRUE;
1307     g_list_free (tmp);
1308   }
1309   SELECTION_UNLOCK (dbin);
1310 }
1311
1312 /* sort_streams:
1313  * GCompareFunc to use with lists of GstStream.
1314  * Sorts GstStreams by stream type and SELECT flag and stream-id
1315  * First video, then audio, then others.
1316  *
1317  * Return: negative if a<b, 0 if a==b, positive if a>b
1318  */
1319 static gint
1320 sort_streams (GstStream * sa, GstStream * sb)
1321 {
1322   GstStreamType typea, typeb;
1323   GstStreamFlags flaga, flagb;
1324   const gchar *ida, *idb;
1325   gint ret = 0;
1326
1327   typea = gst_stream_get_stream_type (sa);
1328   typeb = gst_stream_get_stream_type (sb);
1329
1330   GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1331       gst_stream_get_stream_id (sb));
1332
1333   /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1334   if (typea != typeb) {
1335     if (typea & GST_STREAM_TYPE_VIDEO)
1336       ret = -1;
1337     else if (typea & GST_STREAM_TYPE_AUDIO)
1338       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1339     else if (typea & GST_STREAM_TYPE_TEXT)
1340       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1341           && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1342     else if (typea & GST_STREAM_TYPE_CONTAINER)
1343       ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1344     else
1345       ret = 1;
1346
1347     if (ret != 0) {
1348       GST_LOG ("Sort by stream-type: %d", ret);
1349       return ret;
1350     }
1351   }
1352
1353   /* Sort by SELECT flag, if stream type is same. */
1354   flaga = gst_stream_get_stream_flags (sa);
1355   flagb = gst_stream_get_stream_flags (sb);
1356
1357   ret =
1358       (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1359       -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1360
1361   if (ret != 0) {
1362     GST_LOG ("Sort by SELECT flag: %d", ret);
1363     return ret;
1364   }
1365
1366   /* Sort by stream-id, if otherwise the same. */
1367   ida = gst_stream_get_stream_id (sa);
1368   idb = gst_stream_get_stream_id (sb);
1369   ret = g_strcmp0 (ida, idb);
1370
1371   GST_LOG ("Sort by stream-id: %d", ret);
1372
1373   return ret;
1374 }
1375
1376 /* Call with INPUT_LOCK taken */
1377 static GstStreamCollection *
1378 get_merged_collection (GstDecodebin3 * dbin)
1379 {
1380   gboolean needs_merge = FALSE;
1381   GstStreamCollection *res = NULL;
1382   GList *tmp;
1383   GList *unsorted_streams = NULL;
1384   guint i, nb_stream;
1385
1386   /* First check if we need to do a merge or just return the only collection */
1387   res = dbin->main_input->collection;
1388
1389   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1390     DecodebinInput *input = (DecodebinInput *) tmp->data;
1391     if (input->collection) {
1392       if (res) {
1393         needs_merge = TRUE;
1394         break;
1395       }
1396       res = input->collection;
1397     }
1398   }
1399
1400   if (!needs_merge) {
1401     GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1402     return res ? gst_object_ref (res) : NULL;
1403   }
1404
1405   /* We really need to create a new collection */
1406   /* FIXME : Some numbering scheme maybe ?? */
1407   res = gst_stream_collection_new ("decodebin3");
1408   if (dbin->main_input->collection) {
1409     nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1410     GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1411     for (i = 0; i < nb_stream; i++) {
1412       GstStream *stream =
1413           gst_stream_collection_get_stream (dbin->main_input->collection, i);
1414       unsorted_streams = g_list_append (unsorted_streams, stream);
1415     }
1416   }
1417
1418   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1419     DecodebinInput *input = (DecodebinInput *) tmp->data;
1420     GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1421         input->collection);
1422     if (input->collection) {
1423       nb_stream = gst_stream_collection_get_size (input->collection);
1424       GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1425       for (i = 0; i < nb_stream; i++) {
1426         GstStream *stream =
1427             gst_stream_collection_get_stream (input->collection, i);
1428         /* Only add if not already present in the list */
1429         if (!g_list_find (unsorted_streams, stream))
1430           unsorted_streams = g_list_append (unsorted_streams, stream);
1431       }
1432     }
1433   }
1434
1435   /* re-order streams : video, then audio, then others */
1436   unsorted_streams =
1437       g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1438   for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1439     GstStream *stream = (GstStream *) tmp->data;
1440     GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1441         gst_stream_get_stream_id (stream));
1442     gst_stream_collection_add_stream (res, gst_object_ref (stream));
1443   }
1444
1445   if (unsorted_streams)
1446     g_list_free (unsorted_streams);
1447
1448   return res;
1449 }
1450
1451 /* Call with INPUT_LOCK taken */
1452 static DecodebinInput *
1453 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1454 {
1455   DecodebinInput *input = NULL;
1456   GstElement *parent = gst_object_ref (child);
1457   GList *tmp;
1458
1459   do {
1460     GstElement *next_parent;
1461
1462     GST_DEBUG_OBJECT (dbin, "parent %s",
1463         parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1464
1465     if (parent == dbin->main_input->parsebin) {
1466       input = dbin->main_input;
1467       break;
1468     }
1469     for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1470       DecodebinInput *cur = (DecodebinInput *) tmp->data;
1471       if (parent == cur->parsebin) {
1472         input = cur;
1473         break;
1474       }
1475     }
1476     next_parent = (GstElement *) gst_element_get_parent (parent);
1477     gst_object_unref (parent);
1478     parent = next_parent;
1479
1480   } while (parent && parent != (GstElement *) dbin);
1481
1482   if (parent)
1483     gst_object_unref (parent);
1484
1485   return input;
1486 }
1487
1488 static const gchar *
1489 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1490 {
1491   guint i, len;
1492
1493   if (dbin->collection == NULL)
1494     return NULL;
1495   len = gst_stream_collection_get_size (dbin->collection);
1496   for (i = 0; i < len; i++) {
1497     GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1498     const gchar *osid = gst_stream_get_stream_id (stream);
1499     if (!g_strcmp0 (sid, osid))
1500       return osid;
1501   }
1502
1503   return NULL;
1504 }
1505
1506 /* Call with INPUT_LOCK taken */
1507 static void
1508 handle_stream_collection (GstDecodebin3 * dbin,
1509     GstStreamCollection * collection, DecodebinInput * input)
1510 {
1511 #ifndef GST_DISABLE_GST_DEBUG
1512   const gchar *upstream_id;
1513   guint i;
1514 #endif
1515   if (!input) {
1516     GST_DEBUG_OBJECT (dbin,
1517         "Couldn't find corresponding input, most likely shutting down");
1518     return;
1519   }
1520
1521   /* Replace collection in input */
1522   if (input->collection)
1523     gst_object_unref (input->collection);
1524   input->collection = gst_object_ref (collection);
1525   GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1526       input);
1527
1528   /* Merge collection if needed */
1529   collection = get_merged_collection (dbin);
1530
1531 #ifndef GST_DISABLE_GST_DEBUG
1532   /* Just some debugging */
1533   upstream_id = gst_stream_collection_get_upstream_id (collection);
1534   GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1535   GST_DEBUG ("From input %p", input);
1536   GST_DEBUG ("  %d streams", gst_stream_collection_get_size (collection));
1537   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1538     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1539     GstTagList *taglist;
1540     GstCaps *caps;
1541
1542     GST_DEBUG ("   Stream '%s'", gst_stream_get_stream_id (stream));
1543     GST_DEBUG ("     type  : %s",
1544         gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1545     GST_DEBUG ("     flags : 0x%x", gst_stream_get_stream_flags (stream));
1546     taglist = gst_stream_get_tags (stream);
1547     GST_DEBUG ("     tags  : %" GST_PTR_FORMAT, taglist);
1548     caps = gst_stream_get_caps (stream);
1549     GST_DEBUG ("     caps  : %" GST_PTR_FORMAT, caps);
1550     if (taglist)
1551       gst_tag_list_unref (taglist);
1552     if (caps)
1553       gst_caps_unref (caps);
1554   }
1555 #endif
1556
1557   /* Store collection for later usage */
1558   SELECTION_LOCK (dbin);
1559   if (dbin->collection == NULL) {
1560     dbin->collection = collection;
1561   } else {
1562     /* We need to check who emitted this collection (the owner).
1563      * If we already had a collection from that user, this one is an update,
1564      * that is to say that we need to figure out how we are going to re-use
1565      * the streams/slot */
1566     GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1567     /* FIXME : When do we switch from pending collection to active collection ?
1568      * When all streams from active collection are drained in multiqueue output ? */
1569     gst_object_unref (dbin->collection);
1570     dbin->collection = collection;
1571     /* dbin->pending_collection = */
1572     /*     g_list_append (dbin->pending_collection, collection); */
1573   }
1574   dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1575   SELECTION_UNLOCK (dbin);
1576 }
1577
1578 /* Must be called with the selection lock taken */
1579 static void
1580 gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
1581 {
1582   GstClockTime max_latency = GST_CLOCK_TIME_NONE;
1583   GList *tmp;
1584
1585   GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
1586   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1587     DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1588     if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
1589       if (max_latency == GST_CLOCK_TIME_NONE
1590           || out->decoder_latency > max_latency)
1591         max_latency = out->decoder_latency;
1592     }
1593   }
1594   GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
1595       GST_TIME_ARGS (max_latency));
1596
1597   if (!GST_CLOCK_TIME_IS_VALID (max_latency))
1598     return;
1599
1600   /* Make sure we keep an extra overhead */
1601   max_latency += 100 * GST_MSECOND;
1602   if (max_latency == dbin->current_mq_min_interleave)
1603     return;
1604
1605   dbin->current_mq_min_interleave = max_latency;
1606   GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
1607       GST_TIME_ARGS (dbin->current_mq_min_interleave));
1608   g_object_set (dbin->multiqueue, "min-interleave-time",
1609       dbin->current_mq_min_interleave, NULL);
1610 }
1611
1612 static void
1613 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1614 {
1615   GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1616   gboolean posting_collection = FALSE;
1617
1618   GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1619
1620   switch (GST_MESSAGE_TYPE (message)) {
1621     case GST_MESSAGE_STREAM_COLLECTION:
1622     {
1623       GstStreamCollection *collection = NULL;
1624       DecodebinInput *input;
1625
1626       INPUT_LOCK (dbin);
1627       input =
1628           find_message_parsebin (dbin,
1629           (GstElement *) GST_MESSAGE_SRC (message));
1630       if (input == NULL) {
1631         GST_DEBUG_OBJECT (dbin,
1632             "Couldn't find corresponding input, most likely shutting down");
1633         INPUT_UNLOCK (dbin);
1634         break;
1635       }
1636       if (input->upstream_selected) {
1637         GST_DEBUG_OBJECT (dbin,
1638             "Upstream handles selection, not using/forwarding collection");
1639         INPUT_UNLOCK (dbin);
1640         goto drop_message;
1641       }
1642       gst_message_parse_stream_collection (message, &collection);
1643       if (collection) {
1644         handle_stream_collection (dbin, collection, input);
1645         posting_collection = TRUE;
1646       }
1647       INPUT_UNLOCK (dbin);
1648
1649       SELECTION_LOCK (dbin);
1650       if (dbin->collection) {
1651         /* Replace collection message, we most likely aggregated it */
1652         GstMessage *new_msg;
1653         new_msg =
1654             gst_message_new_stream_collection ((GstObject *) dbin,
1655             dbin->collection);
1656         gst_message_unref (message);
1657         message = new_msg;
1658       }
1659       SELECTION_UNLOCK (dbin);
1660
1661       if (collection)
1662         gst_object_unref (collection);
1663       break;
1664     }
1665     case GST_MESSAGE_LATENCY:
1666     {
1667       GList *tmp;
1668       /* Check if this is from one of our decoders */
1669       SELECTION_LOCK (dbin);
1670       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1671         DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1672         if (out->decoder == (GstElement *) GST_MESSAGE_SRC (message)) {
1673           GstClockTime min, max;
1674           if (GST_IS_VIDEO_DECODER (out->decoder)) {
1675             gst_video_decoder_get_latency (GST_VIDEO_DECODER (out->decoder),
1676                 &min, &max);
1677             GST_DEBUG_OBJECT (dbin,
1678                 "Got latency update from one of our decoders. min: %"
1679                 GST_TIME_FORMAT " max: %" GST_TIME_FORMAT, GST_TIME_ARGS (min),
1680                 GST_TIME_ARGS (max));
1681             out->decoder_latency = min;
1682             /* Trigger recalculation */
1683             gst_decodebin3_update_min_interleave (dbin);
1684           }
1685           break;
1686         }
1687       }
1688       SELECTION_UNLOCK (dbin);
1689     }
1690     default:
1691       break;
1692   }
1693
1694   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1695
1696   if (posting_collection) {
1697     /* Figure out a selection for that collection */
1698     update_requested_selection (dbin);
1699   }
1700
1701   return;
1702
1703 drop_message:
1704   {
1705     GST_DEBUG_OBJECT (bin, "dropping message");
1706     gst_message_unref (message);
1707   }
1708 }
1709
1710 static DecodebinOutputStream *
1711 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1712 {
1713   GList *tmp;
1714   GstStreamType stype = gst_stream_get_stream_type (stream);
1715
1716   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1717     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1718     if (output->type == stype && output->slot && output->slot->active_stream) {
1719       GstStream *tstream = output->slot->active_stream;
1720       if (!stream_in_list (dbin->requested_selection,
1721               (gchar *) gst_stream_get_stream_id (tstream))) {
1722         return output;
1723       }
1724     }
1725   }
1726
1727   return NULL;
1728 }
1729
1730 /* Give a certain slot, figure out if it should be linked to an
1731  * output stream
1732  * CALL WITH SELECTION LOCK TAKEN !*/
1733 static DecodebinOutputStream *
1734 get_output_for_slot (MultiQueueSlot * slot)
1735 {
1736   GstDecodebin3 *dbin = slot->dbin;
1737   DecodebinOutputStream *output = NULL;
1738   const gchar *stream_id;
1739   GstCaps *caps;
1740   gchar *id_in_list = NULL;
1741
1742   /* If we already have a configured output, just use it */
1743   if (slot->output != NULL)
1744     return slot->output;
1745
1746   /*
1747    * FIXME
1748    *
1749    * This method needs to be split into multiple parts
1750    *
1751    * 1) Figure out whether stream should be exposed or not
1752    *   This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1753    *   in the default stream attribution
1754    *
1755    * 2) Figure out whether an output stream should be created, whether
1756    *   we can re-use the output stream already linked to the slot, or
1757    *   whether we need to get re-assigned another (currently used) output
1758    *   stream.
1759    */
1760
1761   stream_id = gst_stream_get_stream_id (slot->active_stream);
1762   caps = gst_stream_get_caps (slot->active_stream);
1763   GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1764   gst_caps_unref (caps);
1765
1766   /* 0. Emit autoplug-continue signal for pending caps ? */
1767   GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1768
1769   /* 1. if in EXPOSE_ALL_MODE, just accept */
1770   GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1771
1772 #if 0
1773   /* FIXME : The idea around this was to avoid activating a stream for
1774    *     which we have no decoder. Unfortunately it is way too
1775    *     expensive. Need to figure out a better solution */
1776   /* 2. Is there a potential decoder (if one is required) */
1777   if (!gst_caps_can_intersect (caps, dbin->caps)
1778       && !have_factory (dbin, (GstCaps *) caps,
1779           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1780     GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1781         caps);
1782     SELECTION_UNLOCK (dbin);
1783     gst_element_post_message (GST_ELEMENT_CAST (dbin),
1784         gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1785     SELECTION_LOCK (dbin);
1786     return NULL;
1787   }
1788 #endif
1789
1790   /* 3. In default mode check if we should expose */
1791   id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
1792   if (id_in_list || dbin->upstream_selected) {
1793     /* Check if we can steal an existing output stream we could re-use.
1794      * that is:
1795      * * an output stream whose slot->stream is not in requested
1796      * * and is of the same type as this stream
1797      */
1798     output = find_free_compatible_output (dbin, slot->active_stream);
1799     if (output) {
1800       /* Move this output from its current slot to this slot */
1801       dbin->to_activate =
1802           g_list_append (dbin->to_activate, (gchar *) stream_id);
1803       dbin->requested_selection =
1804           g_list_remove (dbin->requested_selection, id_in_list);
1805       g_free (id_in_list);
1806       SELECTION_UNLOCK (dbin);
1807       gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1808           (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1809       SELECTION_LOCK (dbin);
1810       return NULL;
1811     }
1812
1813     output = create_output_stream (dbin, slot->type);
1814     output->slot = slot;
1815     GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1816     slot->output = output;
1817     GST_DEBUG ("Adding '%s' to active_selection", stream_id);
1818     dbin->active_selection =
1819         g_list_append (dbin->active_selection, (gchar *) g_strdup (stream_id));
1820   } else
1821     GST_DEBUG ("Not creating any output for slot %p", slot);
1822
1823   return output;
1824 }
1825
1826 /* Returns SELECTED_STREAMS message if active_selection is equal to
1827  * requested_selection, else NULL.
1828  * Must be called with LOCK taken */
1829 static GstMessage *
1830 is_selection_done (GstDecodebin3 * dbin)
1831 {
1832   GList *tmp;
1833   GstMessage *msg;
1834
1835   if (!dbin->selection_updated)
1836     return NULL;
1837
1838   GST_LOG_OBJECT (dbin, "Checking");
1839
1840   if (dbin->to_activate != NULL) {
1841     GST_DEBUG ("Still have streams to activate");
1842     return NULL;
1843   }
1844   for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1845     GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1846     if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1847       GST_DEBUG ("Not in active selection, returning");
1848       return NULL;
1849     }
1850   }
1851
1852   GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1853
1854   /* We are completely active */
1855   msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1856   if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) {
1857     gst_message_set_seqnum (msg, dbin->select_streams_seqnum);
1858   }
1859   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1860     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1861     if (output->slot) {
1862       GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1863           gst_stream_get_stream_id (output->slot->active_stream));
1864
1865       gst_message_streams_selected_add (msg, output->slot->active_stream);
1866     } else
1867       GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1868   }
1869   dbin->selection_updated = FALSE;
1870   return msg;
1871 }
1872
1873 /* Must be called with SELECTION_LOCK taken */
1874 static void
1875 check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev)
1876 {
1877   gboolean all_drained = TRUE;
1878   GList *iter;
1879
1880   GST_DEBUG_OBJECT (dbin, "check slot for eos");
1881
1882   for (iter = dbin->slots; iter; iter = iter->next) {
1883     MultiQueueSlot *slot = iter->data;
1884
1885     if (!slot->output)
1886       continue;
1887
1888     if (slot->is_drained) {
1889       GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
1890       continue;
1891     }
1892
1893     all_drained = FALSE;
1894     break;
1895   }
1896
1897   if (all_drained) {
1898     INPUT_LOCK (dbin);
1899     if (!pending_pads_are_eos (dbin->main_input))
1900       all_drained = FALSE;
1901
1902     if (all_drained) {
1903       for (iter = dbin->other_inputs; iter; iter = iter->next) {
1904         if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
1905           all_drained = FALSE;
1906           break;
1907         }
1908       }
1909     }
1910     INPUT_UNLOCK (dbin);
1911   }
1912
1913   if (all_drained) {
1914     GST_DEBUG_OBJECT (dbin,
1915         "All active slots are drained, and no pending input, push EOS");
1916
1917     for (iter = dbin->input_streams; iter; iter = iter->next) {
1918       DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
1919       GstPad *peer = gst_pad_get_peer (input->srcpad);
1920
1921       /* Send EOS to all slots */
1922       if (peer) {
1923         GstEvent *stream_start, *eos;
1924
1925         stream_start =
1926             gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1927
1928         /* First forward a custom STREAM_START event to reset the EOS status (if any) */
1929         if (stream_start) {
1930           GstStructure *s;
1931           GstEvent *custom_stream_start = gst_event_copy (stream_start);
1932           gst_event_unref (stream_start);
1933           s = (GstStructure *) gst_event_get_structure (custom_stream_start);
1934           gst_structure_set (s, "decodebin3-flushing-stream-start",
1935               G_TYPE_BOOLEAN, TRUE, NULL);
1936           gst_pad_send_event (peer, custom_stream_start);
1937         }
1938
1939         eos = gst_event_new_eos ();
1940         gst_event_set_seqnum (eos, gst_event_get_seqnum (ev));
1941         gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
1942             CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
1943             NULL);
1944         gst_pad_send_event (peer, eos);
1945         gst_object_unref (peer);
1946       } else
1947         GST_DEBUG_OBJECT (dbin, "no output");
1948     }
1949   }
1950 }
1951
1952 static GstPadProbeReturn
1953 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1954     MultiQueueSlot * slot)
1955 {
1956   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1957   GstDecodebin3 *dbin = slot->dbin;
1958
1959   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1960     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1961
1962     GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1963     switch (GST_EVENT_TYPE (ev)) {
1964       case GST_EVENT_STREAM_START:
1965       {
1966         GstStream *stream = NULL;
1967         const GstStructure *s = gst_event_get_structure (ev);
1968
1969         /* Drop STREAM_START events used to cleanup multiqueue */
1970         if (s
1971             && gst_structure_has_field (s,
1972                 "decodebin3-flushing-stream-start")) {
1973           ret = GST_PAD_PROBE_HANDLED;
1974           gst_event_unref (ev);
1975           break;
1976         }
1977
1978         gst_event_parse_stream (ev, &stream);
1979         if (stream == NULL) {
1980           GST_ERROR_OBJECT (pad,
1981               "Got a STREAM_START event without a GstStream");
1982           break;
1983         }
1984         slot->is_drained = FALSE;
1985         GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
1986             gst_stream_get_stream_id (stream));
1987         if (slot->active_stream == NULL) {
1988           slot->active_stream = stream;
1989         } else if (slot->active_stream != stream) {
1990           GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1991               gst_stream_get_stream_id (slot->active_stream),
1992               gst_stream_get_stream_id (stream));
1993           gst_object_unref (slot->active_stream);
1994           slot->active_stream = stream;
1995         } else
1996           gst_object_unref (stream);
1997 #if 0                           /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1998         {
1999           gboolean is_active, is_requested;
2000           /* Quick check to see if we're in the current selection */
2001           /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
2002           SELECTION_LOCK (dbin);
2003           GST_DEBUG_OBJECT (dbin, "Checking active selection");
2004           is_active = stream_in_list (dbin->active_selection, stream_id);
2005           GST_DEBUG_OBJECT (dbin, "Checking requested selection");
2006           is_requested = stream_in_list (dbin->requested_selection, stream_id);
2007           SELECTION_UNLOCK (dbin);
2008           if (is_active)
2009             GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
2010                 slot->output);
2011           if (is_requested)
2012             GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
2013                 slot->output);
2014           else if (slot->output) {
2015             GST_DEBUG_OBJECT (pad,
2016                 "Slot needs to be deactivated ? It's no longer in requested selection");
2017           } else if (!is_active)
2018             GST_DEBUG_OBJECT (pad,
2019                 "Slot in neither active nor requested selection");
2020         }
2021 #endif
2022       }
2023         break;
2024       case GST_EVENT_CAPS:
2025       {
2026         /* Configure the output slot if needed */
2027         DecodebinOutputStream *output;
2028         GstMessage *msg = NULL;
2029         SELECTION_LOCK (dbin);
2030         output = get_output_for_slot (slot);
2031         if (output) {
2032           reconfigure_output_stream (output, slot);
2033           msg = is_selection_done (dbin);
2034         }
2035         SELECTION_UNLOCK (dbin);
2036         if (msg)
2037           gst_element_post_message ((GstElement *) slot->dbin, msg);
2038       }
2039         break;
2040       case GST_EVENT_EOS:
2041       {
2042         gboolean was_drained = slot->is_drained;
2043         slot->is_drained = TRUE;
2044
2045         /* Custom EOS handling first */
2046         if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2047                 CUSTOM_EOS_QUARK)) {
2048           /* remove custom-eos */
2049           ev = gst_event_make_writable (ev);
2050           GST_PAD_PROBE_INFO_DATA (info) = ev;
2051           gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
2052               CUSTOM_EOS_QUARK, NULL, NULL);
2053
2054           GST_LOG_OBJECT (pad, "Received custom EOS");
2055           ret = GST_PAD_PROBE_HANDLED;
2056           SELECTION_LOCK (dbin);
2057           if (slot->input == NULL) {
2058             GST_DEBUG_OBJECT (pad,
2059                 "Got custom-eos from null input stream, remove output stream");
2060             /* Remove the output */
2061             if (slot->output) {
2062               DecodebinOutputStream *output = slot->output;
2063               dbin->output_streams =
2064                   g_list_remove (dbin->output_streams, output);
2065               free_output_stream (dbin, output);
2066               /* Reacalculate min interleave */
2067               gst_decodebin3_update_min_interleave (dbin);
2068             }
2069             slot->probe_id = 0;
2070             dbin->slots = g_list_remove (dbin->slots, slot);
2071             free_multiqueue_slot_async (dbin, slot);
2072             ret = GST_PAD_PROBE_REMOVE;
2073           } else if (!was_drained) {
2074             check_all_slot_for_eos (dbin, ev);
2075           }
2076           if (ret == GST_PAD_PROBE_HANDLED)
2077             gst_event_unref (ev);
2078           SELECTION_UNLOCK (dbin);
2079           break;
2080         }
2081
2082         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
2083             slot->input);
2084         if (slot->input == NULL) {
2085           GstPad *peer;
2086           GST_DEBUG_OBJECT (pad,
2087               "last EOS for input, forwarding and removing slot");
2088           peer = gst_pad_get_peer (pad);
2089           if (peer) {
2090             gst_pad_send_event (peer, gst_event_ref (ev));
2091             gst_object_unref (peer);
2092           }
2093           SELECTION_LOCK (dbin);
2094           /* FIXME : Shouldn't we try to re-assign the output instead of just
2095            * removing it ? */
2096           /* Remove the output */
2097           if (slot->output) {
2098             DecodebinOutputStream *output = slot->output;
2099             dbin->output_streams = g_list_remove (dbin->output_streams, output);
2100             free_output_stream (dbin, output);
2101           }
2102           slot->probe_id = 0;
2103           dbin->slots = g_list_remove (dbin->slots, slot);
2104           SELECTION_UNLOCK (dbin);
2105
2106           /* FIXME: Removing the slot is async, which means actually
2107            * unlinking the pad is async. Other things like stream-start
2108            * might flow through this (now unprobed) link before it actually
2109            * gets released */
2110           free_multiqueue_slot_async (dbin, slot);
2111           ret = GST_PAD_PROBE_REMOVE;
2112         } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2113                 CUSTOM_FINAL_EOS_QUARK)) {
2114           GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
2115         } else {
2116           GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
2117           /* drop current event as eos will be sent in check_all_slot_for_eos
2118            * when all output streams are also eos */
2119           ret = GST_PAD_PROBE_DROP;
2120           SELECTION_LOCK (dbin);
2121           check_all_slot_for_eos (dbin, ev);
2122           SELECTION_UNLOCK (dbin);
2123         }
2124       }
2125         break;
2126       default:
2127         break;
2128     }
2129   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
2130     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
2131     switch (GST_QUERY_TYPE (query)) {
2132       case GST_QUERY_CAPS:
2133       {
2134         GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
2135         gst_query_set_caps_result (query, GST_CAPS_ANY);
2136         ret = GST_PAD_PROBE_HANDLED;
2137       }
2138         break;
2139
2140       case GST_QUERY_ACCEPT_CAPS:
2141       {
2142         GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
2143         /* If the current decoder doesn't accept caps, we'll reconfigure
2144          * on the actual caps event. So accept any caps. */
2145         gst_query_set_accept_caps_result (query, TRUE);
2146         ret = GST_PAD_PROBE_HANDLED;
2147       }
2148       default:
2149         break;
2150     }
2151   }
2152
2153   return ret;
2154 }
2155
2156 /* Create a new multiqueue slot for the given type
2157  *
2158  * It is up to the caller to know whether that slot is needed or not
2159  * (and release it when no longer needed) */
2160 static MultiQueueSlot *
2161 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
2162 {
2163   MultiQueueSlot *slot;
2164   GstIterator *it = NULL;
2165   GValue item = { 0, };
2166
2167   GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
2168       gst_stream_type_get_name (type));
2169   slot = g_new0 (MultiQueueSlot, 1);
2170   slot->dbin = dbin;
2171
2172   slot->id = dbin->slot_id++;
2173
2174   slot->type = type;
2175   slot->sink_pad = gst_element_request_pad_simple (dbin->multiqueue, "sink_%u");
2176   if (slot->sink_pad == NULL)
2177     goto fail;
2178
2179   it = gst_pad_iterate_internal_links (slot->sink_pad);
2180   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
2181       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
2182     GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
2183         GST_DEBUG_PAD_NAME (slot->src_pad));
2184     goto fail;
2185   }
2186   gst_iterator_free (it);
2187   g_value_reset (&item);
2188
2189   g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
2190
2191   /* Add event probe */
2192   slot->probe_id =
2193       gst_pad_add_probe (slot->src_pad,
2194       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2195       (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
2196
2197   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
2198       GST_DEBUG_PAD_NAME (slot->src_pad));
2199
2200   dbin->slots = g_list_append (dbin->slots, slot);
2201
2202   return slot;
2203
2204   /* ERRORS */
2205 fail:
2206   {
2207     if (slot->sink_pad)
2208       gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2209     g_free (slot);
2210     return NULL;
2211   }
2212 }
2213
2214 /* Must be called with SELECTION_LOCK */
2215 static MultiQueueSlot *
2216 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
2217 {
2218   GList *tmp;
2219   MultiQueueSlot *empty_slot = NULL;
2220   GstStreamType input_type = 0;
2221   gchar *stream_id = NULL;
2222
2223   GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
2224       input, input->active_stream,
2225       input->
2226       active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
2227
2228   if (input->active_stream) {
2229     input_type = gst_stream_get_stream_type (input->active_stream);
2230     stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
2231   }
2232
2233   /* Go over existing slots and check if there is already one for it */
2234   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2235     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2236     /* Already used input, return that one */
2237     if (slot->input == input) {
2238       GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
2239       return slot;
2240     }
2241   }
2242
2243   /* Go amongst all unused slots of the right type and try to find a candidate */
2244   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2245     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2246     if (slot->input == NULL && input_type == slot->type) {
2247       /* Remember this empty slot for later */
2248       empty_slot = slot;
2249       /* Check if available slot is of the same stream_id */
2250       GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2251           slot->id, slot->active_stream);
2252       if (stream_id && slot->active_stream) {
2253         gchar *ostream_id =
2254             (gchar *) gst_stream_get_stream_id (slot->active_stream);
2255         GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2256             ostream_id, stream_id);
2257         if (!g_strcmp0 (stream_id, ostream_id))
2258           break;
2259       }
2260     }
2261   }
2262
2263   if (empty_slot) {
2264     GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2265     empty_slot->input = input;
2266     return empty_slot;
2267   }
2268
2269   if (input_type)
2270     return create_new_slot (dbin, input_type);
2271
2272   return NULL;
2273 }
2274
2275 static void
2276 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2277 {
2278   if (slot->input != NULL && slot->input != input) {
2279     GST_ERROR_OBJECT (slot->dbin,
2280         "Trying to link input to an already used slot");
2281     return;
2282   }
2283   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2284   slot->pending_stream = input->active_stream;
2285   slot->input = input;
2286 }
2287
2288 #if 0
2289 static gboolean
2290 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2291     GstElementFactoryListType ftype)
2292 {
2293   gboolean ret = FALSE;
2294   GList *res;
2295
2296   g_mutex_lock (&dbin->factories_lock);
2297   gst_decode_bin_update_factories_list (dbin);
2298   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2299     res =
2300         gst_element_factory_list_filter (dbin->decoder_factories,
2301         caps, GST_PAD_SINK, TRUE);
2302   else
2303     res =
2304         gst_element_factory_list_filter (dbin->decodable_factories,
2305         caps, GST_PAD_SINK, TRUE);
2306   g_mutex_unlock (&dbin->factories_lock);
2307
2308   if (res) {
2309     ret = TRUE;
2310     gst_plugin_feature_list_free (res);
2311   }
2312
2313   return ret;
2314 }
2315 #endif
2316
2317 static GList *
2318 create_decoder_factory_list (GstDecodebin3 * dbin, GstCaps * caps)
2319 {
2320   GList *res;
2321
2322   g_mutex_lock (&dbin->factories_lock);
2323   gst_decode_bin_update_factories_list (dbin);
2324   res = gst_element_factory_list_filter (dbin->decoder_factories,
2325       caps, GST_PAD_SINK, TRUE);
2326   g_mutex_unlock (&dbin->factories_lock);
2327   return res;
2328 }
2329
2330 static GstPadProbeReturn
2331 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2332     DecodebinOutputStream * output)
2333 {
2334   GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2335   /* If we have a keyframe, remove the probe and let all data through */
2336   /* FIXME : HANDLE HEADER BUFFER ?? */
2337   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2338       GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2339     GST_DEBUG_OBJECT (pad,
2340         "Buffer is keyframe or header, letting through and removing probe");
2341     output->drop_probe_id = 0;
2342     return GST_PAD_PROBE_REMOVE;
2343   }
2344   GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2345   return GST_PAD_PROBE_DROP;
2346 }
2347
2348 static void
2349 reconfigure_output_stream (DecodebinOutputStream * output,
2350     MultiQueueSlot * slot)
2351 {
2352   GstDecodebin3 *dbin = output->dbin;
2353   GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2354   gboolean needs_decoder;
2355
2356   needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2357
2358   GST_DEBUG_OBJECT (dbin,
2359       "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2360       needs_decoder);
2361
2362   /* FIXME : Maybe make the output un-hook itself automatically ? */
2363   if (output->slot != NULL && output->slot != slot) {
2364     GST_WARNING_OBJECT (dbin,
2365         "Output still linked to another slot (%p)", output->slot);
2366     gst_caps_unref (new_caps);
2367     return;
2368   }
2369
2370   /* Check if existing config is reusable as-is by checking if
2371    * the existing decoder accepts the new caps, if not delete
2372    * it and create a new one */
2373   if (output->decoder) {
2374     gboolean can_reuse_decoder;
2375
2376     if (needs_decoder) {
2377       can_reuse_decoder =
2378           gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2379     } else
2380       can_reuse_decoder = FALSE;
2381
2382     if (can_reuse_decoder) {
2383       if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2384         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2385         output->drop_probe_id =
2386             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2387             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2388       }
2389       GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2390       if (output->linked == FALSE) {
2391         gst_pad_link_full (slot->src_pad, output->decoder_sink,
2392             GST_PAD_LINK_CHECK_NOTHING);
2393         output->linked = TRUE;
2394       }
2395       gst_caps_unref (new_caps);
2396       return;
2397     }
2398
2399     GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2400
2401     if (output->linked)
2402       gst_pad_unlink (slot->src_pad, output->decoder_sink);
2403     output->linked = FALSE;
2404     if (output->drop_probe_id) {
2405       gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2406       output->drop_probe_id = 0;
2407     }
2408
2409     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2410       GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2411       gst_caps_unref (new_caps);
2412       goto cleanup;
2413     }
2414
2415     gst_element_set_locked_state (output->decoder, TRUE);
2416     gst_element_set_state (output->decoder, GST_STATE_NULL);
2417
2418     gst_bin_remove ((GstBin *) dbin, output->decoder);
2419     output->decoder = NULL;
2420     output->decoder_latency = GST_CLOCK_TIME_NONE;
2421   } else if (output->linked) {
2422     /* Otherwise if we have no decoder yet but the output is linked make
2423      * sure that the ghost pad is really unlinked in case no decoder was
2424      * needed previously */
2425     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2426       GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
2427       gst_caps_unref (new_caps);
2428       goto cleanup;
2429     }
2430   }
2431
2432   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2433   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2434
2435   /* If a decoder is required, create one */
2436   if (needs_decoder) {
2437     GList *factories, *next_factory;
2438
2439     factories = next_factory = create_decoder_factory_list (dbin, new_caps);
2440     while (!output->decoder) {
2441       gboolean decoder_failed = FALSE;
2442
2443       /* If we don't have a decoder yet, instantiate one */
2444       if (next_factory) {
2445         output->decoder = gst_element_factory_create ((GstElementFactory *)
2446             next_factory->data, NULL);
2447         GST_DEBUG ("Created decoder '%s'", GST_ELEMENT_NAME (output->decoder));
2448       } else
2449         GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
2450             new_caps);
2451
2452       if (output->decoder == NULL) {
2453         GstCaps *caps;
2454
2455         SELECTION_UNLOCK (dbin);
2456         /* FIXME : Should we be smarter if there's a missing decoder ?
2457          * Should we deactivate that stream ? */
2458         caps = gst_stream_get_caps (slot->active_stream);
2459         gst_element_post_message (GST_ELEMENT_CAST (dbin),
2460             gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2461         gst_caps_unref (caps);
2462         SELECTION_LOCK (dbin);
2463         goto cleanup;
2464       }
2465       if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2466         GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2467         goto cleanup;
2468       }
2469       output->decoder_sink =
2470           gst_element_get_static_pad (output->decoder, "sink");
2471       output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2472       if (output->type & GST_STREAM_TYPE_VIDEO) {
2473         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2474         output->drop_probe_id =
2475             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2476             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2477       }
2478       if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2479               GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2480         GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2481             GST_DEBUG_PAD_NAME (output->decoder_sink));
2482         goto cleanup;
2483       }
2484       if (gst_element_set_state (output->decoder,
2485               GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
2486         GST_DEBUG_OBJECT (dbin,
2487             "Decoder '%s' failed to reach READY state, trying the next type",
2488             GST_ELEMENT_NAME (output->decoder));
2489         decoder_failed = TRUE;
2490       }
2491       if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
2492         GST_DEBUG_OBJECT (dbin,
2493             "Decoder '%s' did not accept the caps, trying the next type",
2494             GST_ELEMENT_NAME (output->decoder));
2495         decoder_failed = TRUE;
2496       }
2497       if (decoder_failed) {
2498         gst_pad_unlink (slot->src_pad, output->decoder_sink);
2499         if (output->drop_probe_id) {
2500           gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2501           output->drop_probe_id = 0;
2502         }
2503
2504         gst_element_set_locked_state (output->decoder, TRUE);
2505         gst_element_set_state (output->decoder, GST_STATE_NULL);
2506
2507         gst_bin_remove ((GstBin *) dbin, output->decoder);
2508         output->decoder = NULL;
2509       }
2510       next_factory = next_factory->next;
2511     }
2512     gst_plugin_feature_list_free (factories);
2513   } else {
2514     output->decoder_src = gst_object_ref (slot->src_pad);
2515     output->decoder_sink = NULL;
2516   }
2517   gst_caps_unref (new_caps);
2518
2519   output->linked = TRUE;
2520   if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2521           output->decoder_src)) {
2522     GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2523     goto cleanup;
2524   }
2525   if (output->src_exposed == FALSE) {
2526     GstEvent *stream_start;
2527
2528     stream_start = gst_pad_get_sticky_event (slot->src_pad,
2529         GST_EVENT_STREAM_START, 0);
2530
2531     /* Ensure GstStream is accesiable from pad-added callback */
2532     if (stream_start) {
2533       gst_pad_store_sticky_event (output->src_pad, stream_start);
2534       gst_event_unref (stream_start);
2535     } else {
2536       GST_WARNING_OBJECT (slot->src_pad,
2537           "Pad has no stored stream-start event");
2538     }
2539
2540     output->src_exposed = TRUE;
2541     gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2542   }
2543
2544   if (output->decoder)
2545     gst_element_sync_state_with_parent (output->decoder);
2546
2547   output->slot = slot;
2548   return;
2549
2550 cleanup:
2551   {
2552     GST_DEBUG_OBJECT (dbin, "Cleanup");
2553     if (output->decoder_sink) {
2554       gst_object_unref (output->decoder_sink);
2555       output->decoder_sink = NULL;
2556     }
2557     if (output->decoder_src) {
2558       gst_object_unref (output->decoder_src);
2559       output->decoder_src = NULL;
2560     }
2561     if (output->decoder) {
2562       gst_element_set_state (output->decoder, GST_STATE_NULL);
2563       gst_bin_remove ((GstBin *) dbin, output->decoder);
2564       output->decoder = NULL;
2565     }
2566   }
2567 }
2568
2569 static GstPadProbeReturn
2570 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2571 {
2572   GstMessage *msg = NULL;
2573   DecodebinOutputStream *output;
2574
2575   SELECTION_LOCK (slot->dbin);
2576   output = get_output_for_slot (slot);
2577
2578   GST_DEBUG_OBJECT (pad, "output : %p", output);
2579
2580   if (output) {
2581     reconfigure_output_stream (output, slot);
2582     msg = is_selection_done (slot->dbin);
2583   }
2584   SELECTION_UNLOCK (slot->dbin);
2585   if (msg)
2586     gst_element_post_message ((GstElement *) slot->dbin, msg);
2587
2588   return GST_PAD_PROBE_REMOVE;
2589 }
2590
2591 static MultiQueueSlot *
2592 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2593 {
2594   GList *tmp;
2595
2596   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2597     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2598     const gchar *stream_id;
2599     if (slot->active_stream) {
2600       stream_id = gst_stream_get_stream_id (slot->active_stream);
2601       if (!g_strcmp0 (sid, stream_id))
2602         return slot;
2603     }
2604     if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2605       stream_id = gst_stream_get_stream_id (slot->pending_stream);
2606       if (!g_strcmp0 (sid, stream_id))
2607         return slot;
2608     }
2609   }
2610
2611   return NULL;
2612 }
2613
2614 /* This function handles the reassignment of a slot. Call this from
2615  * the streaming thread of a slot. */
2616 static gboolean
2617 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2618 {
2619   DecodebinOutputStream *output;
2620   MultiQueueSlot *target_slot = NULL;
2621   GList *tmp;
2622   const gchar *sid, *tsid;
2623
2624   SELECTION_LOCK (dbin);
2625   output = slot->output;
2626
2627   if (G_UNLIKELY (slot->active_stream == NULL)) {
2628     GST_DEBUG_OBJECT (slot->src_pad,
2629         "Called on inactive slot (active_stream == NULL)");
2630     SELECTION_UNLOCK (dbin);
2631     return FALSE;
2632   }
2633
2634   if (G_UNLIKELY (output == NULL)) {
2635     GST_DEBUG_OBJECT (slot->src_pad,
2636         "Slot doesn't have any output to be removed");
2637     SELECTION_UNLOCK (dbin);
2638     return FALSE;
2639   }
2640
2641   sid = gst_stream_get_stream_id (slot->active_stream);
2642   GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2643
2644   /* Recheck whether this stream is still in the list of streams to deactivate */
2645   if (stream_in_list (dbin->requested_selection, sid)) {
2646     /* Stream is in the list of requested streams, don't remove */
2647     SELECTION_UNLOCK (dbin);
2648     GST_DEBUG_OBJECT (slot->src_pad,
2649         "Stream '%s' doesn't need to be deactivated", sid);
2650     return FALSE;
2651   }
2652
2653   /* Unlink slot from output */
2654   /* FIXME : Handle flushing ? */
2655   /* FIXME : Handle outputs without decoders */
2656   GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2657       output->decoder_sink);
2658   if (output->decoder_sink)
2659     gst_pad_unlink (slot->src_pad, output->decoder_sink);
2660   output->linked = FALSE;
2661   slot->output = NULL;
2662   output->slot = NULL;
2663   /* Remove sid from active selection */
2664   GST_DEBUG ("Removing '%s' from active_selection", sid);
2665   for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2666     if (!g_strcmp0 (sid, tmp->data)) {
2667       g_free (tmp->data);
2668       dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2669       break;
2670     }
2671
2672   /* Can we re-assign this output to a requested stream ? */
2673   GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2674   for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2675     MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2676     GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2677         tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2678     if (tslot && tslot->type == output->type && tslot->output == NULL) {
2679       GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2680       target_slot = tslot;
2681       tsid = tmp->data;
2682       /* Pass target stream id to requested selection */
2683       dbin->requested_selection =
2684           g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2685       dbin->to_activate = g_list_delete_link (dbin->to_activate, tmp);
2686       break;
2687     }
2688   }
2689
2690   if (target_slot) {
2691     GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2692         target_slot, tsid);
2693     target_slot->output = output;
2694     output->slot = target_slot;
2695     GST_DEBUG ("Adding '%s' to active_selection", tsid);
2696     dbin->active_selection =
2697         g_list_append (dbin->active_selection, (gchar *) g_strdup (tsid));
2698     SELECTION_UNLOCK (dbin);
2699
2700     /* Wakeup the target slot so that it retries to send events/buffers
2701      * thereby triggering the output reconfiguration codepath */
2702     gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2703         (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2704     /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2705   } else {
2706     GstMessage *msg;
2707
2708     dbin->output_streams = g_list_remove (dbin->output_streams, output);
2709     free_output_stream (dbin, output);
2710     msg = is_selection_done (slot->dbin);
2711     SELECTION_UNLOCK (dbin);
2712
2713     if (msg)
2714       gst_element_post_message ((GstElement *) slot->dbin, msg);
2715   }
2716
2717   return TRUE;
2718 }
2719
2720 /* Idle probe called when a slot should be unassigned from its output stream.
2721  * This is needed to ensure nothing is flowing when unlinking the slot.
2722  *
2723  * Also, this method will search for a pending stream which could re-use
2724  * the output stream. */
2725 static GstPadProbeReturn
2726 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2727     MultiQueueSlot * slot)
2728 {
2729   GstDecodebin3 *dbin = slot->dbin;
2730
2731   reassign_slot (dbin, slot);
2732
2733   return GST_PAD_PROBE_REMOVE;
2734 }
2735
2736 static gboolean
2737 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2738     guint32 seqnum)
2739 {
2740   gboolean ret = TRUE;
2741   GList *tmp;
2742   /* List of slots to (de)activate. */
2743   GList *to_deactivate = NULL;
2744   GList *to_activate = NULL;
2745   /* List of unknown stream id, most likely means the event
2746    * should be sent upstream so that elements can expose the requested stream */
2747   GList *unknown = NULL;
2748   GList *to_reassign = NULL;
2749   GList *future_request_streams = NULL;
2750   GList *pending_streams = NULL;
2751   GList *slots_to_reassign = NULL;
2752
2753   SELECTION_LOCK (dbin);
2754   if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2755     GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2756     SELECTION_UNLOCK (dbin);
2757     return TRUE;
2758   }
2759   /* Remove pending select_streams */
2760   g_list_free (dbin->pending_select_streams);
2761   dbin->pending_select_streams = NULL;
2762
2763   /* COMPARE the requested streams to the active and requested streams
2764    * on multiqueue. */
2765
2766   /* First check the slots to activate and which ones are unknown */
2767   for (tmp = select_streams; tmp; tmp = tmp->next) {
2768     const gchar *sid = (const gchar *) tmp->data;
2769     MultiQueueSlot *slot;
2770     GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2771     slot = find_slot_for_stream_id (dbin, sid);
2772     /* Find the corresponding slot */
2773     if (slot == NULL) {
2774       if (stream_in_collection (dbin, (gchar *) sid)) {
2775         pending_streams = g_list_append (pending_streams, (gchar *) sid);
2776       } else {
2777         GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2778         unknown = g_list_append (unknown, (gchar *) sid);
2779       }
2780     } else if (slot->output == NULL) {
2781       GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2782           slot, sid);
2783       to_activate = g_list_append (to_activate, slot);
2784     } else {
2785       GST_DEBUG_OBJECT (dbin,
2786           "Stream '%s' from slot %p is already active on output %p", sid, slot,
2787           slot->output);
2788       future_request_streams =
2789           g_list_append (future_request_streams, (gchar *) sid);
2790     }
2791   }
2792
2793   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2794     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2795     /* For slots that have an output, check if it's part of the streams to
2796      * be active */
2797     if (slot->output) {
2798       gboolean slot_to_deactivate = TRUE;
2799
2800       if (slot->active_stream) {
2801         if (stream_in_list (select_streams,
2802                 gst_stream_get_stream_id (slot->active_stream)))
2803           slot_to_deactivate = FALSE;
2804       }
2805       if (slot_to_deactivate && slot->pending_stream
2806           && slot->pending_stream != slot->active_stream) {
2807         if (stream_in_list (select_streams,
2808                 gst_stream_get_stream_id (slot->pending_stream)))
2809           slot_to_deactivate = FALSE;
2810       }
2811       if (slot_to_deactivate) {
2812         GST_DEBUG_OBJECT (dbin,
2813             "Slot %p (%s) should be deactivated, no longer used", slot,
2814             slot->
2815             active_stream ? gst_stream_get_stream_id (slot->active_stream) :
2816             "NULL");
2817         to_deactivate = g_list_append (to_deactivate, slot);
2818       }
2819     }
2820   }
2821
2822   if (to_deactivate != NULL) {
2823     GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2824     /* We need to compare what needs to be activated and deactivated in order
2825      * to determine whether there are outputs that can be transferred */
2826     /* Take the stream-id of the slots that are to be activated, for which there
2827      * is a slot of the same type that needs to be deactivated */
2828     tmp = to_deactivate;
2829     while (tmp) {
2830       MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2831       gboolean removeit = FALSE;
2832       GList *tmp2, *next;
2833       GST_DEBUG_OBJECT (dbin,
2834           "Checking if slot to deactivate (%p) has a candidate slot to activate",
2835           slot_to_deactivate);
2836       for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2837         MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2838         GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2839         if (slot_to_activate->type == slot_to_deactivate->type) {
2840           GST_DEBUG_OBJECT (dbin, "Re-using");
2841           to_reassign = g_list_append (to_reassign, (gchar *)
2842               gst_stream_get_stream_id (slot_to_activate->active_stream));
2843           slots_to_reassign =
2844               g_list_append (slots_to_reassign, slot_to_deactivate);
2845           to_activate = g_list_remove (to_activate, slot_to_activate);
2846           removeit = TRUE;
2847           break;
2848         }
2849       }
2850       next = tmp->next;
2851       if (removeit)
2852         to_deactivate = g_list_delete_link (to_deactivate, tmp);
2853       tmp = next;
2854     }
2855   }
2856
2857   for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2858     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2859     GST_DEBUG_OBJECT (dbin,
2860         "Really need to deactivate slot %p, but no available alternative",
2861         slot);
2862
2863     slots_to_reassign = g_list_append (slots_to_reassign, slot);
2864   }
2865
2866   /* The only slots left to activate are the ones that won't be reassigned and
2867    * therefore really need to have a new output created */
2868   for (tmp = to_activate; tmp; tmp = tmp->next) {
2869     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2870     if (slot->active_stream)
2871       future_request_streams =
2872           g_list_append (future_request_streams,
2873           (gchar *) gst_stream_get_stream_id (slot->active_stream));
2874     else if (slot->pending_stream)
2875       future_request_streams =
2876           g_list_append (future_request_streams,
2877           (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2878     else
2879       GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2880   }
2881
2882   if (to_activate == NULL && pending_streams != NULL) {
2883     GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2884     if (dbin->requested_selection)
2885       g_list_free_full (dbin->requested_selection, g_free);
2886     dbin->requested_selection =
2887         g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
2888     g_list_free (to_deactivate);
2889     g_list_free (pending_streams);
2890     to_deactivate = NULL;
2891     pending_streams = NULL;
2892   } else {
2893     if (dbin->requested_selection)
2894       g_list_free_full (dbin->requested_selection, g_free);
2895     dbin->requested_selection =
2896         g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
2897     dbin->requested_selection =
2898         g_list_concat (dbin->requested_selection,
2899         g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
2900     if (dbin->to_activate)
2901       g_list_free (dbin->to_activate);
2902     dbin->to_activate = g_list_copy (to_reassign);
2903   }
2904
2905   dbin->selection_updated = TRUE;
2906   SELECTION_UNLOCK (dbin);
2907
2908   if (unknown) {
2909     GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2910     g_list_free (unknown);
2911   }
2912
2913   if (to_activate && !slots_to_reassign) {
2914     for (tmp = to_activate; tmp; tmp = tmp->next) {
2915       MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2916       gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2917           (GstPadProbeCallback) idle_reconfigure, slot, NULL);
2918     }
2919   }
2920
2921   /* For all streams to deactivate, add an idle probe where we will do
2922    * the unassignment and switch over */
2923   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2924     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2925     gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2926         (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2927   }
2928
2929   if (to_deactivate)
2930     g_list_free (to_deactivate);
2931   if (to_activate)
2932     g_list_free (to_activate);
2933   if (to_reassign)
2934     g_list_free (to_reassign);
2935   if (future_request_streams)
2936     g_list_free (future_request_streams);
2937   if (pending_streams)
2938     g_list_free (pending_streams);
2939   if (slots_to_reassign)
2940     g_list_free (slots_to_reassign);
2941
2942   return ret;
2943 }
2944
2945 static GstPadProbeReturn
2946 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2947     DecodebinOutputStream * output)
2948 {
2949   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2950   GstDecodebin3 *dbin = output->dbin;
2951   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2952
2953   GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2954
2955   switch (GST_EVENT_TYPE (event)) {
2956     case GST_EVENT_SELECT_STREAMS:
2957     {
2958       GstPad *peer;
2959       GList *streams = NULL;
2960       guint32 seqnum = gst_event_get_seqnum (event);
2961
2962       if (dbin->upstream_selected) {
2963         GST_DEBUG_OBJECT (pad, "Letting select-streams event flow upstream");
2964         break;
2965       }
2966
2967       SELECTION_LOCK (dbin);
2968       if (seqnum == dbin->select_streams_seqnum) {
2969         SELECTION_UNLOCK (dbin);
2970         GST_DEBUG_OBJECT (pad,
2971             "Already handled/handling that SELECT_STREAMS event");
2972         gst_event_unref (event);
2973         ret = GST_PAD_PROBE_HANDLED;
2974         break;
2975       }
2976       dbin->select_streams_seqnum = seqnum;
2977       if (dbin->pending_select_streams != NULL) {
2978         GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2979         g_list_free (dbin->pending_select_streams);
2980         dbin->pending_select_streams = NULL;
2981       }
2982       gst_event_parse_select_streams (event, &streams);
2983       dbin->pending_select_streams = g_list_copy (streams);
2984       SELECTION_UNLOCK (dbin);
2985
2986       /* Send event upstream */
2987       if ((peer = gst_pad_get_peer (pad))) {
2988         gst_pad_send_event (peer, event);
2989         gst_object_unref (peer);
2990       } else {
2991         gst_event_unref (event);
2992       }
2993       /* Finally handle the switch */
2994       if (streams) {
2995         handle_stream_switch (dbin, streams, seqnum);
2996         g_list_free_full (streams, g_free);
2997       }
2998       ret = GST_PAD_PROBE_HANDLED;
2999     }
3000       break;
3001     default:
3002       break;
3003   }
3004
3005   return ret;
3006 }
3007
3008 static gboolean
3009 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
3010 {
3011   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3012
3013   GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
3014   if (!dbin->upstream_selected
3015       && GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
3016     GList *streams = NULL;
3017     guint32 seqnum = gst_event_get_seqnum (event);
3018
3019     SELECTION_LOCK (dbin);
3020     if (seqnum == dbin->select_streams_seqnum) {
3021       SELECTION_UNLOCK (dbin);
3022       GST_DEBUG_OBJECT (dbin,
3023           "Already handled/handling that SELECT_STREAMS event");
3024       return TRUE;
3025     }
3026     dbin->select_streams_seqnum = seqnum;
3027     if (dbin->pending_select_streams != NULL) {
3028       GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3029       g_list_free (dbin->pending_select_streams);
3030       dbin->pending_select_streams = NULL;
3031     }
3032     gst_event_parse_select_streams (event, &streams);
3033     dbin->pending_select_streams = g_list_copy (streams);
3034     SELECTION_UNLOCK (dbin);
3035
3036     /* FIXME : We don't have an upstream ?? */
3037 #if 0
3038     /* Send event upstream */
3039     if ((peer = gst_pad_get_peer (pad))) {
3040       gst_pad_send_event (peer, event);
3041       gst_object_unref (peer);
3042     }
3043 #endif
3044     /* Finally handle the switch */
3045     if (streams) {
3046       handle_stream_switch (dbin, streams, seqnum);
3047       g_list_free_full (streams, g_free);
3048     }
3049
3050     gst_event_unref (event);
3051     return TRUE;
3052   }
3053   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
3054 }
3055
3056
3057 static void
3058 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3059 {
3060   if (slot->probe_id)
3061     gst_pad_remove_probe (slot->src_pad, slot->probe_id);
3062   if (slot->input) {
3063     if (slot->input->srcpad)
3064       gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
3065   }
3066
3067   gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
3068   gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
3069   gst_object_replace ((GstObject **) & slot->src_pad, NULL);
3070   gst_object_replace ((GstObject **) & slot->active_stream, NULL);
3071   g_free (slot);
3072 }
3073
3074 static void
3075 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3076 {
3077   GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
3078   gst_element_call_async (GST_ELEMENT_CAST (dbin),
3079       (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
3080 }
3081
3082 /* Create a DecodebinOutputStream for a given type
3083  * Note: It will be empty initially, it needs to be configured
3084  * afterwards */
3085 static DecodebinOutputStream *
3086 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
3087 {
3088   DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
3089   gchar *pad_name;
3090   const gchar *prefix;
3091   GstStaticPadTemplate *templ;
3092   GstPadTemplate *ptmpl;
3093   guint32 *counter;
3094   GstPad *internal_pad;
3095
3096   GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
3097       res, gst_stream_type_get_name (type));
3098
3099   res->type = type;
3100   res->dbin = dbin;
3101   res->decoder_latency = GST_CLOCK_TIME_NONE;
3102
3103   if (type & GST_STREAM_TYPE_VIDEO) {
3104     templ = &video_src_template;
3105     counter = &dbin->vpadcount;
3106     prefix = "video";
3107   } else if (type & GST_STREAM_TYPE_AUDIO) {
3108     templ = &audio_src_template;
3109     counter = &dbin->apadcount;
3110     prefix = "audio";
3111   } else if (type & GST_STREAM_TYPE_TEXT) {
3112     templ = &text_src_template;
3113     counter = &dbin->tpadcount;
3114     prefix = "text";
3115   } else {
3116     templ = &src_template;
3117     counter = &dbin->opadcount;
3118     prefix = "src";
3119   }
3120
3121   pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
3122   *counter += 1;
3123   ptmpl = gst_static_pad_template_get (templ);
3124   res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
3125   gst_object_unref (ptmpl);
3126   g_free (pad_name);
3127   gst_pad_set_active (res->src_pad, TRUE);
3128   /* Put an event probe on the internal proxy pad to detect upstream
3129    * events */
3130   internal_pad =
3131       (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
3132   gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
3133       (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
3134   gst_object_unref (internal_pad);
3135
3136   dbin->output_streams = g_list_append (dbin->output_streams, res);
3137
3138   return res;
3139 }
3140
3141 static void
3142 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
3143 {
3144   if (output->slot) {
3145     if (output->decoder_sink && output->decoder)
3146       gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
3147
3148     output->slot->output = NULL;
3149     output->slot = NULL;
3150   }
3151   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
3152   gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
3153   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
3154   if (output->src_exposed) {
3155     gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
3156   }
3157   if (output->decoder) {
3158     gst_element_set_locked_state (output->decoder, TRUE);
3159     gst_element_set_state (output->decoder, GST_STATE_NULL);
3160     gst_bin_remove ((GstBin *) dbin, output->decoder);
3161   }
3162   g_free (output);
3163 }
3164
3165 static GstStateChangeReturn
3166 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
3167 {
3168   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3169   GstStateChangeReturn ret;
3170
3171   /* Upwards */
3172   switch (transition) {
3173     default:
3174       break;
3175   }
3176   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3177   if (ret == GST_STATE_CHANGE_FAILURE)
3178     goto beach;
3179
3180   switch (transition) {
3181     case GST_STATE_CHANGE_PAUSED_TO_READY:
3182     {
3183       GList *tmp;
3184
3185       /* Free output streams */
3186       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
3187         DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
3188         free_output_stream (dbin, output);
3189       }
3190       g_list_free (dbin->output_streams);
3191       dbin->output_streams = NULL;
3192       /* Free multiqueue slots */
3193       for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3194         MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3195         free_multiqueue_slot (dbin, slot);
3196       }
3197       g_list_free (dbin->slots);
3198       dbin->slots = NULL;
3199       dbin->current_group_id = GST_GROUP_ID_INVALID;
3200       /* Free inputs */
3201       /* Reset the main input group id since it will get a new id on a new stream */
3202       dbin->main_input->group_id = GST_GROUP_ID_INVALID;
3203       /* Reset multiqueue to default interleave */
3204       g_object_set (dbin->multiqueue, "min-interleave-time",
3205           dbin->default_mq_min_interleave, NULL);
3206       dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
3207       dbin->upstream_selected = FALSE;
3208       g_list_free_full (dbin->active_selection, g_free);
3209       dbin->active_selection = NULL;
3210     }
3211       break;
3212     default:
3213       break;
3214   }
3215 beach:
3216   return ret;
3217 }