decodebin3: Allow re-using inputs
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst / playback / gstdecodebin3.c
1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32
33 #include "gstplaybackelements.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36
37 /**
38  * SECTION:element-decodebin3
39  * @title: decodebin3
40  *
41  * #GstBin that auto-magically constructs a decoding pipeline using available
42  * decoders and demuxers via auto-plugging. The output is raw audio, video
43  * or subtitle streams.
44  *
45  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46  *
47  * * supports publication and selection of stream information via
48  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAMS events.
49  *
50  * * dynamically switches stream connections internally, and
51  * reuses decoder elements when stream selections change, so that in
52  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53  * and only creates new elements when streams change and an existing decoder
54  * is not capable of handling the new format.
55  *
56  * * supports multiple input pads for the parallel decoding of auxiliary streams
57  * not muxed with the primary stream.
58  *
59  * * does not handle network stream buffering. decodebin3 expects that network stream
60  * buffering is handled upstream, before data is passed to it.
61  *
62  * > decodebin3 is still experimental API and a technology preview.
63  * > Its behaviour and exposed API is subject to change.
64  *
65  */
66
67 /*
68  * Global design
69  *
70  * 1) From sink pad to elementary streams (GstParseBin or identity)
71  *
72  * Note : If the incoming streams are push-based-only and are compatible with
73  * either the output caps or a potential decoder, the usage of parsebin is
74  * replaced by a simple passthrough identity element.
75  *
76  * The input sink pads are fed to GstParseBin. GstParseBin will feed them
77  * through typefind. When the caps are detected (or changed) we recursively
78  * figure out which demuxer, parser or depayloader is needed until we get to
79  * elementary streams.
80  *
81  * All elementary streams (whether decoded or not, whether exposed or not) are
82  * fed through multiqueue. There is only *one* multiqueue in decodebin3.
83  *
84  * => MultiQueue is the cornerstone.
85  * => No buffering before multiqueue
86  *
87  * 2) Elementary streams
88  *
89  * After GstParseBin, there are 3 main components:
90  *  1) Input Streams (provided by GstParseBin)
91  *  2) Multiqueue slots
92  *  3) Output Streams
93  *
94  * Input Streams correspond to the stream coming from GstParseBin and that gets
95  * fed into a multiqueue slot.
96  *
97  * Output Streams correspond to the combination of a (optional) decoder and an
98  * output ghostpad. Output Streams can be moved from one multiqueue slot to
99  * another, can reconfigure itself (different decoders), and can be
100  * added/removed depending on the configuration (all streams outputted, only one
101  * of each type, ...).
102  *
103  * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
104  * each 'active' Input Stream there is a corresponding slot.
105  * Slots might have different streams on input and output (due to internal
106  * buffering).
107  *
108  * Due to internal queuing/buffering/..., all those components (might) behave
109  * asynchronously. Therefore probes will be used on each component source pad to
110  * detect various key-points:
111  *  * EOS :
112  *     the stream is done => Mark that component as done, optionally freeing/removing it
113  *  * STREAM_START :
114  *     a new stream is starting => link it further if needed
115  *
116  * 3) Gradual replacement
117  *
118  * If the caps change at any point in decodebin (input sink pad, demuxer output,
119  * multiqueue output, ..), we gradually replace (if needed) the following elements.
120  *
121  * This is handled by the probes in various locations:
122  *  a) typefind output
123  *  b) multiqueue input (source pad of Input Streams)
124  *  c) multiqueue output (source pad of Multiqueue Slots)
125  *  d) final output (target of source ghostpads)
126  *
127  * When CAPS event arrive at those points, one of three things can happen:
128  * a) There is no elements downstream yet, just create/link-to following elements
129  * b) There are downstream elements, do a ACCEPT_CAPS query
130  *  b.1) The new CAPS are accepted, keep current configuration
131  *  b.2) The new CAPS are not accepted, remove following elements then do a)
132  *
133  *    Components:
134  *
135  *                                                   MultiQ     Output
136  *                     Input(s)                      Slots      Streams
137  *  /-------------------------------------------\   /-----\  /------------- \
138  *
139  * +-------------------------------------------------------------------------+
140  * |                                                                         |
141  * | +---------------------------------------------+                         |
142  * | |   GstParseBin(s)                            |                         |
143  * | |                +--------------+             |  +-----+                |
144  * | |                |              |---[parser]-[|--| Mul |---[ decoder ]-[|
145  * |]--[ typefind ]---|  demuxer(s)  |------------[|  | ti  |                |
146  * | |                |  (if needed) |---[parser]-[|--| qu  |                |
147  * | |                |              |---[parser]-[|--| eu  |---[ decoder ]-[|
148  * | |                +--------------+             |  +------             ^  |
149  * | +---------------------------------------------+        ^             |  |
150  * |                                               ^        |             |  |
151  * +-----------------------------------------------+--------+-------------+--+
152  *                                                 |        |             |
153  *                                                 |        |             |
154  *                                       Probes  --/--------/-------------/
155  *
156  * ATOMIC SWITCHING
157  *
158  * We want to ensure we re-use decoders when switching streams. This takes place
159  * at the multiqueue output level.
160  *
161  * MAIN CONCEPTS
162  *  1) Activating a stream (i.e. linking a slot to an output) is only done within
163  *    the streaming thread in the multiqueue_src_probe() and only if the
164       stream is in the REQUESTED selection.
165  *  2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
166  *    within the stream thread, but only in a purposefully called IDLE probe
167  *    that calls reassign_slot().
168  *
169  * Based on those two principles, 3 "selection" of streams (stream-id) are used:
170  * 1) requested_selection
171  *    All streams within that list should be activated
172  * 2) active_selection
173  *    List of streams that are exposed by decodebin
174  * 3) to_activate
175  *    List of streams that will be moved to requested_selection in the
176  *    reassign_slot() method (i.e. once a stream was deactivated, and the output
177  *    was retargetted)
178  */
179
180
181 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
182 #define GST_CAT_DEFAULT decodebin3_debug
183
184 #define GST_TYPE_DECODEBIN3      (gst_decodebin3_get_type ())
185
186 #define EXTRA_DEBUG 1
187
188 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
189 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
190 static GQuark
191 _custom_final_eos_quark_get (void)
192 {
193   static gsize g_quark;
194
195   if (g_once_init_enter (&g_quark)) {
196     gsize quark =
197         (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
198     g_once_init_leave (&g_quark, quark);
199   }
200   return g_quark;
201 }
202
203 typedef struct _GstDecodebin3 GstDecodebin3;
204 typedef struct _GstDecodebin3Class GstDecodebin3Class;
205
206 typedef struct _DecodebinInputStream DecodebinInputStream;
207 typedef struct _DecodebinInput DecodebinInput;
208 typedef struct _DecodebinOutputStream DecodebinOutputStream;
209
210 struct _GstDecodebin3
211 {
212   GstBin bin;
213
214   /* input_lock protects the following variables */
215   GMutex input_lock;
216   /* Main input (static sink pad) */
217   DecodebinInput *main_input;
218   /* Supplementary input (request sink pads) */
219   GList *other_inputs;
220   /* counter for input */
221   guint32 input_counter;
222   /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
223   /* FIXME : Needs to be reset appropriately (when upstream changes ?) */
224   guint32 current_group_id;
225   /* End of variables protected by input_lock */
226
227   GstElement *multiqueue;
228   GstClockTime default_mq_min_interleave;
229   GstClockTime current_mq_min_interleave;
230
231   /* selection_lock protects access to following variables */
232   GMutex selection_lock;
233   GList *input_streams;         /* List of DecodebinInputStream for active collection */
234   GList *output_streams;        /* List of DecodebinOutputStream used for output */
235   GList *slots;                 /* List of MultiQueueSlot */
236   guint slot_id;
237
238   /* Active collection */
239   GstStreamCollection *collection;
240   /* requested selection of stream-id to activate post-multiqueue */
241   GList *requested_selection;
242   /* list of stream-id currently activated in output */
243   GList *active_selection;
244   /* List of stream-id that need to be activated (after a stream switch for ex) */
245   GList *to_activate;
246   /* Pending select streams event */
247   guint32 select_streams_seqnum;
248   /* pending list of streams to select (from downstream) */
249   GList *pending_select_streams;
250   /* TRUE if requested_selection was updated, will become FALSE once
251    * it has fully transitioned to active */
252   gboolean selection_updated;
253   /* End of variables protected by selection_lock */
254   gboolean upstream_selected;
255
256   /* List of pending collections.
257    * FIXME : Is this really needed ? */
258   GList *pending_collection;
259
260   /* Factories */
261   GMutex factories_lock;
262   guint32 factories_cookie;
263   /* All DECODABLE factories */
264   GList *factories;
265   /* Only DECODER factories */
266   GList *decoder_factories;
267   /* DECODABLE but not DECODER factories */
268   GList *decodable_factories;
269
270   /* counters for pads */
271   guint32 apadcount, vpadcount, tpadcount, opadcount;
272
273   /* Properties */
274   GstCaps *caps;
275 };
276
277 struct _GstDecodebin3Class
278 {
279   GstBinClass class;
280
281     gint (*select_stream) (GstDecodebin3 * dbin,
282       GstStreamCollection * collection, GstStream * stream);
283 };
284
285 /* Input of decodebin, controls input pad and parsebin */
286 struct _DecodebinInput
287 {
288   GstDecodebin3 *dbin;
289
290   gboolean is_main;
291
292   GstPad *ghost_sink;
293   GstPad *parsebin_sink;
294
295   GstStreamCollection *collection;      /* Active collection */
296   gboolean upstream_selected;
297
298   guint group_id;
299
300   /* Either parsebin or identity is used */
301   GstElement *parsebin;
302   GstElement *identity;
303
304   gulong pad_added_sigid;
305   gulong pad_removed_sigid;
306   gulong drained_sigid;
307
308   /* TRUE if the input got drained
309    * FIXME : When do we reset it if re-used ?
310    */
311   gboolean drained;
312 };
313
314 /* Multiqueue Slots */
315 typedef struct _MultiQueueSlot
316 {
317   guint id;
318
319   GstDecodebin3 *dbin;
320   /* Type of stream handled by this slot */
321   GstStreamType type;
322
323   /* Linked input and output */
324   DecodebinInputStream *input;
325
326   /* pending => last stream received on sink pad */
327   GstStream *pending_stream;
328   /* active => last stream outputted on source pad */
329   GstStream *active_stream;
330
331   GstPad *sink_pad, *src_pad;
332
333   /* id of the MQ src_pad event probe */
334   gulong probe_id;
335
336   gboolean is_drained;
337
338   DecodebinOutputStream *output;
339 } MultiQueueSlot;
340
341 /* Streams that are exposed downstream (i.e. output) */
342 struct _DecodebinOutputStream
343 {
344   GstDecodebin3 *dbin;
345   /* The type of stream handled by this output stream */
346   GstStreamType type;
347
348   /* The slot to which this output stream is currently connected to */
349   MultiQueueSlot *slot;
350
351   GstElement *decoder;          /* Optional */
352   GstPad *decoder_sink, *decoder_src;
353   gboolean linked;
354
355   /* ghostpad */
356   GstPad *src_pad;
357   /* Flag if ghost pad is exposed */
358   gboolean src_exposed;
359
360   /* Reported decoder latency */
361   GstClockTime decoder_latency;
362
363   /* keyframe dropping probe */
364   gulong drop_probe_id;
365 };
366
367 /* properties */
368 enum
369 {
370   PROP_0,
371   PROP_CAPS
372 };
373
374 /* signals */
375 enum
376 {
377   SIGNAL_SELECT_STREAM,
378   SIGNAL_ABOUT_TO_FINISH,
379   LAST_SIGNAL
380 };
381 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
382
383 #define SELECTION_LOCK(dbin) G_STMT_START {                             \
384     GST_LOG_OBJECT (dbin,                                               \
385                     "selection locking from thread %p",                 \
386                     g_thread_self ());                                  \
387     g_mutex_lock (&dbin->selection_lock);                               \
388     GST_LOG_OBJECT (dbin,                                               \
389                     "selection locked from thread %p",                  \
390                     g_thread_self ());                                  \
391   } G_STMT_END
392
393 #define SELECTION_UNLOCK(dbin) G_STMT_START {                           \
394     GST_LOG_OBJECT (dbin,                                               \
395                     "selection unlocking from thread %p",               \
396                     g_thread_self ());                                  \
397     g_mutex_unlock (&dbin->selection_lock);                             \
398   } G_STMT_END
399
400 #define INPUT_LOCK(dbin) G_STMT_START {                         \
401     GST_LOG_OBJECT (dbin,                                               \
402                     "input locking from thread %p",                     \
403                     g_thread_self ());                                  \
404     g_mutex_lock (&dbin->input_lock);                           \
405     GST_LOG_OBJECT (dbin,                                               \
406                     "input locked from thread %p",                      \
407                     g_thread_self ());                                  \
408   } G_STMT_END
409
410 #define INPUT_UNLOCK(dbin) G_STMT_START {                               \
411     GST_LOG_OBJECT (dbin,                                               \
412                     "input unlocking from thread %p",           \
413                     g_thread_self ());                                  \
414     g_mutex_unlock (&dbin->input_lock);                         \
415   } G_STMT_END
416
417 GType gst_decodebin3_get_type (void);
418 #define gst_decodebin3_parent_class parent_class
419 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
420 #define _do_init \
421     GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");\
422     playback_element_init (plugin);
423 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (decodebin3, "decodebin3", GST_RANK_NONE,
424     GST_TYPE_DECODEBIN3, _do_init);
425
426 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
427
428 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
429     GST_PAD_SINK,
430     GST_PAD_ALWAYS,
431     GST_STATIC_CAPS_ANY);
432
433 static GstStaticPadTemplate request_sink_template =
434 GST_STATIC_PAD_TEMPLATE ("sink_%u",
435     GST_PAD_SINK,
436     GST_PAD_REQUEST,
437     GST_STATIC_CAPS_ANY);
438
439 static GstStaticPadTemplate video_src_template =
440 GST_STATIC_PAD_TEMPLATE ("video_%u",
441     GST_PAD_SRC,
442     GST_PAD_SOMETIMES,
443     GST_STATIC_CAPS_ANY);
444
445 static GstStaticPadTemplate audio_src_template =
446 GST_STATIC_PAD_TEMPLATE ("audio_%u",
447     GST_PAD_SRC,
448     GST_PAD_SOMETIMES,
449     GST_STATIC_CAPS_ANY);
450
451 static GstStaticPadTemplate text_src_template =
452 GST_STATIC_PAD_TEMPLATE ("text_%u",
453     GST_PAD_SRC,
454     GST_PAD_SOMETIMES,
455     GST_STATIC_CAPS_ANY);
456
457 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
458     GST_PAD_SRC,
459     GST_PAD_SOMETIMES,
460     GST_STATIC_CAPS_ANY);
461
462
463 static void gst_decodebin3_dispose (GObject * object);
464 static void gst_decodebin3_finalize (GObject * object);
465 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
466     const GValue * value, GParamSpec * pspec);
467 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
468     GValue * value, GParamSpec * pspec);
469
470 static gboolean parsebin_autoplug_continue_cb (GstElement *
471     parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
472
473 static gint
474 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
475     GstStreamCollection * collection, GstStream * stream)
476 {
477   GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
478
479   return -1;
480 }
481
482 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
483     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
484 static void gst_decodebin3_release_pad (GstElement * element, GstPad * pad);
485 static void handle_stream_collection (GstDecodebin3 * dbin,
486     GstStreamCollection * collection, DecodebinInput * input);
487 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
488 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
489     GstStateChange transition);
490 static gboolean gst_decodebin3_send_event (GstElement * element,
491     GstEvent * event);
492
493 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
494 #if 0
495 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
496     GstElementFactoryListType ftype);
497 #endif
498
499 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
500 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
501 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
502
503 static void reconfigure_output_stream (DecodebinOutputStream * output,
504     MultiQueueSlot * slot);
505 static void free_output_stream (GstDecodebin3 * dbin,
506     DecodebinOutputStream * output);
507 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
508     GstStreamType type);
509
510 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
511     GstPadProbeInfo * info, MultiQueueSlot * slot);
512 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
513 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
514     DecodebinInputStream * input);
515 static void link_input_to_slot (DecodebinInputStream * input,
516     MultiQueueSlot * slot);
517 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
518 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
519     MultiQueueSlot * slot);
520
521 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
522 static void update_requested_selection (GstDecodebin3 * dbin);
523
524 /* FIXME: Really make all the parser stuff a self-contained helper object */
525 #include "gstdecodebin3-parse.c"
526
527 static gboolean
528 _gst_int_accumulator (GSignalInvocationHint * ihint,
529     GValue * return_accu, const GValue * handler_return, gpointer dummy)
530 {
531   gint res = g_value_get_int (handler_return);
532
533   g_value_set_int (return_accu, res);
534
535   if (res == -1)
536     return TRUE;
537
538   return FALSE;
539 }
540
541 static void
542 gst_decodebin3_class_init (GstDecodebin3Class * klass)
543 {
544   GObjectClass *gobject_klass = (GObjectClass *) klass;
545   GstElementClass *element_class = (GstElementClass *) klass;
546   GstBinClass *bin_klass = (GstBinClass *) klass;
547
548   gobject_klass->dispose = gst_decodebin3_dispose;
549   gobject_klass->finalize = gst_decodebin3_finalize;
550   gobject_klass->set_property = gst_decodebin3_set_property;
551   gobject_klass->get_property = gst_decodebin3_get_property;
552
553   /* FIXME : ADD PROPERTIES ! */
554   g_object_class_install_property (gobject_klass, PROP_CAPS,
555       g_param_spec_boxed ("caps", "Caps",
556           "The caps on which to stop decoding. (NULL = default)",
557           GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
558
559   /* FIXME : ADD SIGNALS ! */
560   /**
561    * GstDecodebin3::select-stream
562    * @decodebin: a #GstDecodebin3
563    * @collection: a #GstStreamCollection
564    * @stream: a #GstStream
565    *
566    * This signal is emitted whenever @decodebin needs to decide whether
567    * to expose a @stream of a given @collection.
568    *
569    * Note that the prefered way to select streams is to listen to
570    * GST_MESSAGE_STREAM_COLLECTION on the bus and send a
571    * GST_EVENT_SELECT_STREAMS with the streams the user wants.
572    *
573    * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
574    * A value of -1 (default) lets @decodebin decide what to do with the stream.
575    * */
576   gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
577       g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
578       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
579       _gst_int_accumulator, NULL, NULL,
580       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
581
582   /**
583    * GstDecodebin3::about-to-finish:
584    *
585    * This signal is emitted when the data for the selected URI is
586    * entirely buffered and it is safe to specify another URI.
587    */
588   gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
589       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
590       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
591
592
593   element_class->request_new_pad =
594       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
595   element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
596   element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
597   element_class->release_pad = GST_DEBUG_FUNCPTR (gst_decodebin3_release_pad);
598
599   gst_element_class_add_pad_template (element_class,
600       gst_static_pad_template_get (&sink_template));
601   gst_element_class_add_pad_template (element_class,
602       gst_static_pad_template_get (&request_sink_template));
603   gst_element_class_add_pad_template (element_class,
604       gst_static_pad_template_get (&video_src_template));
605   gst_element_class_add_pad_template (element_class,
606       gst_static_pad_template_get (&audio_src_template));
607   gst_element_class_add_pad_template (element_class,
608       gst_static_pad_template_get (&text_src_template));
609   gst_element_class_add_pad_template (element_class,
610       gst_static_pad_template_get (&src_template));
611
612   gst_element_class_set_static_metadata (element_class,
613       "Decoder Bin 3", "Generic/Bin/Decoder",
614       "Autoplug and decode to raw media",
615       "Edward Hervey <edward@centricular.com>");
616
617   bin_klass->handle_message = gst_decodebin3_handle_message;
618
619   klass->select_stream = gst_decodebin3_select_stream;
620 }
621
622 static void
623 gst_decodebin3_init (GstDecodebin3 * dbin)
624 {
625   /* Create main input */
626   dbin->main_input = create_new_input (dbin, TRUE);
627
628   dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
629   g_object_get (dbin->multiqueue, "min-interleave-time",
630       &dbin->default_mq_min_interleave, NULL);
631   dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
632   g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
633       "max-size-buffers", 0, "use-interleave", TRUE, NULL);
634   gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
635
636   dbin->current_group_id = GST_GROUP_ID_INVALID;
637
638   g_mutex_init (&dbin->factories_lock);
639   g_mutex_init (&dbin->selection_lock);
640   g_mutex_init (&dbin->input_lock);
641
642   dbin->caps = gst_static_caps_get (&default_raw_caps);
643
644   GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
645 }
646
647 static void
648 gst_decodebin3_dispose (GObject * object)
649 {
650   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
651   GList *walk, *next;
652
653   if (dbin->factories) {
654     gst_plugin_feature_list_free (dbin->factories);
655     dbin->factories = NULL;
656   }
657   if (dbin->decoder_factories) {
658     g_list_free (dbin->decoder_factories);
659     dbin->decoder_factories = NULL;
660   }
661   if (dbin->decodable_factories) {
662     g_list_free (dbin->decodable_factories);
663     dbin->decodable_factories = NULL;
664   }
665   g_list_free_full (dbin->requested_selection, g_free);
666   g_list_free_full (dbin->active_selection, g_free);
667   g_list_free (dbin->to_activate);
668   g_list_free (dbin->pending_select_streams);
669
670   dbin->requested_selection = NULL;
671   dbin->active_selection = NULL;
672   dbin->to_activate = NULL;
673   dbin->pending_select_streams = NULL;
674
675   g_clear_object (&dbin->collection);
676
677   if (dbin->main_input) {
678     free_input (dbin, dbin->main_input);
679     dbin->main_input = NULL;
680   }
681
682   for (walk = dbin->other_inputs; walk; walk = next) {
683     DecodebinInput *input = walk->data;
684
685     next = g_list_next (walk);
686
687     free_input (dbin, input);
688     dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
689   }
690
691   G_OBJECT_CLASS (parent_class)->dispose (object);
692 }
693
694 static void
695 gst_decodebin3_finalize (GObject * object)
696 {
697   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
698
699   g_mutex_clear (&dbin->factories_lock);
700   g_mutex_clear (&dbin->selection_lock);
701   g_mutex_clear (&dbin->input_lock);
702
703   G_OBJECT_CLASS (parent_class)->finalize (object);
704 }
705
706 static void
707 gst_decodebin3_set_property (GObject * object, guint prop_id,
708     const GValue * value, GParamSpec * pspec)
709 {
710   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
711
712   /* FIXME : IMPLEMENT */
713   switch (prop_id) {
714     case PROP_CAPS:
715       GST_OBJECT_LOCK (dbin);
716       if (dbin->caps)
717         gst_caps_unref (dbin->caps);
718       dbin->caps = g_value_dup_boxed (value);
719       GST_OBJECT_UNLOCK (dbin);
720       break;
721     default:
722       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
723       break;
724   }
725 }
726
727 static void
728 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
729     GParamSpec * pspec)
730 {
731   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
732
733   /* FIXME : IMPLEMENT */
734   switch (prop_id) {
735     case PROP_CAPS:
736       GST_OBJECT_LOCK (dbin);
737       g_value_set_boxed (value, dbin->caps);
738       GST_OBJECT_UNLOCK (dbin);
739       break;
740     default:
741       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
742       break;
743   }
744 }
745
746 static gboolean
747 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
748     GstCaps * caps, GstDecodebin3 * dbin)
749 {
750   GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
751
752   /* If it matches our target caps, expose it */
753   if (gst_caps_can_intersect (caps, dbin->caps))
754     return FALSE;
755
756   return TRUE;
757 }
758
759 /* This method should be called whenever a STREAM_START event
760  * comes out of a given parsebin.
761  * The caller shall replace the group_id if the function returns TRUE */
762 static gboolean
763 set_input_group_id (DecodebinInput * input, guint32 * group_id)
764 {
765   GstDecodebin3 *dbin = input->dbin;
766
767   if (input->group_id != *group_id) {
768     if (input->group_id != GST_GROUP_ID_INVALID)
769       GST_WARNING_OBJECT (dbin,
770           "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
771           ") on input %p ", input->group_id, *group_id, input);
772     input->group_id = *group_id;
773   }
774
775   if (*group_id != dbin->current_group_id) {
776     /* The input is being re-used with a different incoming stream, we do want
777      * to change/unify to this new group-id */
778     if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
779       GST_DEBUG_OBJECT (dbin,
780           "Setting current group id to %" G_GUINT32_FORMAT, *group_id);
781       dbin->current_group_id = *group_id;
782     }
783     *group_id = dbin->current_group_id;
784     return TRUE;
785   }
786
787   return FALSE;
788 }
789
790 static void
791 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
792 {
793   GstDecodebin3 *dbin = input->dbin;
794   gboolean all_drained;
795   GList *tmp;
796
797   GST_INFO_OBJECT (dbin, "input %p drained", input);
798   input->drained = TRUE;
799
800   all_drained = dbin->main_input->drained;
801   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
802     DecodebinInput *data = (DecodebinInput *) tmp->data;
803
804     all_drained &= data->drained;
805   }
806
807   if (all_drained) {
808     GST_INFO_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
809     g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
810         NULL);
811   }
812 }
813
814 /* Call with INPUT_LOCK taken */
815 static gboolean
816 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
817 {
818   gboolean set_state = FALSE;
819
820   if (input->parsebin == NULL) {
821     input->parsebin = gst_element_factory_make ("parsebin", NULL);
822     if (input->parsebin == NULL)
823       goto no_parsebin;
824     input->parsebin = gst_object_ref (input->parsebin);
825     input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
826     input->pad_added_sigid =
827         g_signal_connect (input->parsebin, "pad-added",
828         (GCallback) parsebin_pad_added_cb, input);
829     input->pad_removed_sigid =
830         g_signal_connect (input->parsebin, "pad-removed",
831         (GCallback) parsebin_pad_removed_cb, input);
832     input->drained_sigid =
833         g_signal_connect (input->parsebin, "drained",
834         (GCallback) parsebin_drained_cb, input);
835     g_signal_connect (input->parsebin, "autoplug-continue",
836         (GCallback) parsebin_autoplug_continue_cb, dbin);
837   }
838
839   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
840     /* The state lock is taken so that we ensure we are the one (de)activating
841      * parsebin. We need to do this to ensure any activation taking place in
842      * parsebin (including by elements doing upstream activation) are done
843      * within the same thread. */
844     GST_STATE_LOCK (input->parsebin);
845     gst_bin_add (GST_BIN (dbin), input->parsebin);
846     set_state = TRUE;
847   }
848
849   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
850       input->parsebin_sink);
851
852   if (set_state) {
853     gst_element_sync_state_with_parent (input->parsebin);
854     GST_STATE_UNLOCK (input->parsebin);
855   }
856
857   return TRUE;
858
859   /* ERRORS */
860 no_parsebin:
861   {
862     gst_element_post_message ((GstElement *) dbin,
863         gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
864     return FALSE;
865   }
866 }
867
868 static GstPadLinkReturn
869 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
870 {
871   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
872   GstQuery *query;
873   gboolean pull_mode = FALSE;
874   GstPadLinkReturn res = GST_PAD_LINK_OK;
875   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
876
877   g_return_val_if_fail (input, GST_PAD_LINK_REFUSED);
878
879   GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT, pad);
880
881   query = gst_query_new_scheduling ();
882   if (gst_pad_query (peer, query)
883       && gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL,
884           GST_SCHEDULING_FLAG_SEEKABLE))
885     pull_mode = TRUE;
886   gst_query_unref (query);
887
888   GST_DEBUG_OBJECT (dbin, "Upstream can do pull-based : %d", pull_mode);
889
890   /* If upstream *can* do pull-based, we always use a parsebin. If not, we will
891    * delay that decision to a later stage (caps/stream/collection event
892    * processing) to figure out if one is really needed or whether an identity
893    * element will be enough */
894   INPUT_LOCK (dbin);
895   if (pull_mode) {
896     if (!ensure_input_parsebin (dbin, input))
897       res = GST_PAD_LINK_REFUSED;
898     else if (input->identity) {
899       GST_ERROR_OBJECT (dbin,
900           "Can't reconfigure input from push-based to pull-based");
901       res = GST_PAD_LINK_REFUSED;
902     }
903   }
904   INPUT_UNLOCK (dbin);
905
906   return res;
907 }
908
909 /* Drop duration query during _input_pad_unlink */
910 static GstPadProbeReturn
911 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
912     DecodebinInput * input)
913 {
914   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
915
916   if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
917     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
918     if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
919       GST_LOG_OBJECT (pad, "stop forwarding query duration");
920       ret = GST_PAD_PROBE_HANDLED;
921     }
922   }
923
924   return ret;
925 }
926
927 static void
928 recalculate_group_id (GstDecodebin3 * dbin)
929 {
930   guint32 common_group_id;
931   GList *iter;
932
933   common_group_id = dbin->main_input->group_id;
934
935   for (iter = dbin->other_inputs; iter; iter = iter->next) {
936     DecodebinInput *input = iter->data;
937
938     if (input->group_id != common_group_id)
939       return;
940   }
941
942   GST_DEBUG_OBJECT (dbin, "Updating global group_id to %" G_GUINT32_FORMAT,
943       common_group_id);
944   dbin->current_group_id = common_group_id;
945 }
946
947 /* CALL with INPUT LOCK */
948 static void
949 reset_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
950 {
951   GList *iter;
952
953   if (input->parsebin == NULL)
954     return;
955
956   GST_DEBUG_OBJECT (dbin, "Resetting %" GST_PTR_FORMAT, input->parsebin);
957
958   GST_STATE_LOCK (dbin);
959   gst_element_set_state (input->parsebin, GST_STATE_NULL);
960   input->drained = FALSE;
961   input->group_id = GST_GROUP_ID_INVALID;
962   recalculate_group_id (dbin);
963   for (iter = dbin->input_streams; iter; iter = iter->next) {
964     DecodebinInputStream *istream = iter->data;
965     if (istream->input == input)
966       istream->saw_eos = TRUE;
967   }
968   gst_element_sync_state_with_parent (input->parsebin);
969   GST_STATE_UNLOCK (dbin);
970 }
971
972
973 static void
974 gst_decodebin3_input_pad_unlink (GstPad * pad, GstPad * peer,
975     DecodebinInput * input)
976 {
977   GstDecodebin3 *dbin = input->dbin;
978
979   g_return_if_fail (input);
980
981   GST_LOG_OBJECT (dbin, "Got unlink on input pad %" GST_PTR_FORMAT, pad);
982
983   INPUT_LOCK (dbin);
984   if (input->parsebin == NULL) {
985     INPUT_UNLOCK (dbin);
986     return;
987   }
988
989   if (GST_PAD_MODE (pad) == GST_PAD_MODE_PULL) {
990     GST_DEBUG_OBJECT (dbin, "Resetting parsebin since it's pull-based");
991     reset_input_parsebin (dbin, input);
992   }
993
994   INPUT_UNLOCK (dbin);
995 }
996
997 static void
998 gst_decodebin3_release_pad (GstElement * element, GstPad * pad)
999 {
1000   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1001   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
1002   GstStreamCollection *collection = NULL;
1003   gulong probe_id = 0;
1004   GstMessage *msg;
1005
1006   g_return_if_fail (input);
1007   GST_LOG_OBJECT (dbin, "Releasing pad %" GST_PTR_FORMAT, pad);
1008
1009   INPUT_LOCK (dbin);
1010
1011   /* Clear stream-collection corresponding to current INPUT and post new
1012    * stream-collection message, if needed */
1013   if (input->collection) {
1014     gst_object_unref (input->collection);
1015     input->collection = NULL;
1016   }
1017
1018   SELECTION_LOCK (dbin);
1019   collection = get_merged_collection (dbin);
1020   if (!collection) {
1021     SELECTION_UNLOCK (dbin);
1022     goto beach;
1023   }
1024   if (collection == dbin->collection) {
1025     SELECTION_UNLOCK (dbin);
1026     gst_object_unref (collection);
1027     goto beach;
1028   }
1029
1030   GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
1031
1032   if (dbin->collection)
1033     gst_object_unref (dbin->collection);
1034   dbin->collection = collection;
1035   dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1036
1037   msg =
1038       gst_message_new_stream_collection ((GstObject *) dbin, dbin->collection);
1039
1040   if (input->parsebin)
1041     /* Drop duration queries that the application might be doing while this message is posted */
1042     probe_id = gst_pad_add_probe (input->parsebin_sink,
1043         GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
1044         (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
1045
1046   SELECTION_UNLOCK (dbin);
1047   gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
1048   update_requested_selection (dbin);
1049
1050   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
1051   if (input->parsebin) {
1052     gst_bin_remove (GST_BIN (dbin), input->parsebin);
1053     gst_element_set_state (input->parsebin, GST_STATE_NULL);
1054     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
1055     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
1056     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
1057     gst_pad_remove_probe (input->parsebin_sink, probe_id);
1058     gst_object_unref (input->parsebin);
1059     gst_object_unref (input->parsebin_sink);
1060
1061     input->parsebin = NULL;
1062     input->parsebin_sink = NULL;
1063   }
1064   if (input->identity) {
1065     GstPad *idpad = gst_element_get_static_pad (input->identity, "src");
1066     DecodebinInputStream *stream = find_input_stream_for_pad (dbin, idpad);
1067     gst_object_unref (idpad);
1068     remove_input_stream (dbin, stream);
1069     gst_element_set_state (input->identity, GST_STATE_NULL);
1070     gst_bin_remove (GST_BIN (dbin), input->identity);
1071     gst_object_unref (input->identity);
1072     input->identity = NULL;
1073   }
1074
1075   if (!input->is_main) {
1076     dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
1077     free_input (dbin, input);
1078   }
1079
1080 beach:
1081   INPUT_UNLOCK (dbin);
1082   return;
1083 }
1084
1085 /* Call with INPUT LOCK */
1086 static void
1087 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
1088 {
1089   GST_DEBUG ("Freeing input %p", input);
1090
1091   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
1092   gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
1093   if (input->parsebin) {
1094     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
1095     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
1096     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
1097     gst_element_set_state (input->parsebin, GST_STATE_NULL);
1098     gst_object_unref (input->parsebin);
1099     gst_object_unref (input->parsebin_sink);
1100   }
1101   if (input->identity) {
1102     GstPad *idpad = gst_element_get_static_pad (input->identity, "src");
1103     DecodebinInputStream *stream = find_input_stream_for_pad (dbin, idpad);
1104     gst_object_unref (idpad);
1105     remove_input_stream (dbin, stream);
1106     gst_element_set_state (input->identity, GST_STATE_NULL);
1107     gst_object_unref (input->identity);
1108   }
1109   if (input->collection)
1110     gst_object_unref (input->collection);
1111   g_free (input);
1112 }
1113
1114 static gboolean
1115 sink_query_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstQuery * query)
1116 {
1117   DecodebinInput *input =
1118       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1119
1120   g_return_val_if_fail (input, FALSE);
1121
1122   GST_DEBUG_OBJECT (sinkpad, "query %" GST_PTR_FORMAT, query);
1123
1124   /* We accept any caps, since we will reconfigure ourself internally if the new
1125    * stream is incompatible */
1126   if (GST_QUERY_TYPE (query) == GST_QUERY_ACCEPT_CAPS) {
1127     GST_DEBUG_OBJECT (dbin, "Accepting ACCEPT_CAPS query");
1128     gst_query_set_accept_caps_result (query, TRUE);
1129     return TRUE;
1130   }
1131   return gst_pad_query_default (sinkpad, GST_OBJECT (dbin), query);
1132 }
1133
1134 static gboolean
1135 is_parsebin_required_for_input (GstDecodebin3 * dbin, DecodebinInput * input,
1136     GstCaps * newcaps, GstPad * sinkpad)
1137 {
1138   gboolean parsebin_needed = TRUE;
1139   GstStream *stream;
1140
1141   stream = gst_pad_get_stream (sinkpad);
1142
1143   if (stream == NULL) {
1144     /* If upstream didn't provide a `GstStream` we will need to create a
1145      * parsebin to handle that stream */
1146     GST_DEBUG_OBJECT (sinkpad,
1147         "Need to create parsebin since upstream doesn't provide GstStream");
1148   } else if (gst_caps_can_intersect (newcaps, dbin->caps)) {
1149     /* If the incoming caps match decodebin3 output, no processing is needed */
1150     GST_FIXME_OBJECT (sinkpad, "parsebin not needed (matches output caps) !");
1151     parsebin_needed = FALSE;
1152   } else {
1153     GList *decoder_list;
1154     /* If the incoming caps are compatible with a decoder, we don't need to
1155      * process it before */
1156     g_mutex_lock (&dbin->factories_lock);
1157     gst_decode_bin_update_factories_list (dbin);
1158     decoder_list =
1159         gst_element_factory_list_filter (dbin->decoder_factories, newcaps,
1160         GST_PAD_SINK, TRUE);
1161     g_mutex_unlock (&dbin->factories_lock);
1162     if (decoder_list) {
1163       GST_FIXME_OBJECT (sinkpad, "parsebin not needed (available decoders) !");
1164       gst_plugin_feature_list_free (decoder_list);
1165       parsebin_needed = FALSE;
1166     }
1167   }
1168   if (stream)
1169     gst_object_unref (stream);
1170
1171   return parsebin_needed;
1172 }
1173
1174 static void
1175 setup_identify_for_input (GstDecodebin3 * dbin, DecodebinInput * input,
1176     GstPad * sinkpad)
1177 {
1178   GstPad *idsrc, *idsink;
1179   DecodebinInputStream *inputstream;
1180
1181   GST_DEBUG_OBJECT (sinkpad, "Adding identity for new input stream");
1182
1183   input->identity = gst_element_factory_make ("identity", NULL);
1184   /* We drop allocation queries due to our usage of multiqueue just
1185    * afterwards. It is just too dangerous.
1186    *
1187    * If application users want to have optimal raw source <=> sink allocations
1188    * they should not use decodebin3
1189    */
1190   g_object_set (input->identity, "drop-allocation", TRUE, NULL);
1191   input->identity = gst_object_ref (input->identity);
1192   idsink = gst_element_get_static_pad (input->identity, "sink");
1193   idsrc = gst_element_get_static_pad (input->identity, "src");
1194   gst_bin_add (GST_BIN (dbin), input->identity);
1195
1196   SELECTION_LOCK (dbin);
1197   inputstream = create_input_stream (dbin, idsrc, input);
1198   /* Forward any existing GstStream directly on the input stream */
1199   inputstream->active_stream = gst_pad_get_stream (sinkpad);
1200   SELECTION_UNLOCK (dbin);
1201
1202   gst_object_unref (idsrc);
1203   gst_object_unref (idsink);
1204   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), idsink);
1205   gst_element_sync_state_with_parent (input->identity);
1206 }
1207
1208 static gboolean
1209 sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
1210 {
1211   DecodebinInput *input =
1212       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1213
1214   g_return_val_if_fail (input, FALSE);
1215
1216   GST_DEBUG_OBJECT (sinkpad, "event %" GST_PTR_FORMAT, event);
1217
1218   switch (GST_EVENT_TYPE (event)) {
1219     case GST_EVENT_STREAM_START:
1220     {
1221       GstQuery *q = gst_query_new_selectable ();
1222
1223       /* Query whether upstream can handle stream selection or not */
1224       if (gst_pad_peer_query (sinkpad, q)) {
1225         gst_query_parse_selectable (q, &input->upstream_selected);
1226         GST_DEBUG_OBJECT (sinkpad, "Upstream is selectable : %d",
1227             input->upstream_selected);
1228       } else {
1229         input->upstream_selected = FALSE;
1230         GST_DEBUG_OBJECT (sinkpad, "Upstream does not handle SELECTABLE query");
1231       }
1232       gst_query_unref (q);
1233
1234       /* FIXME : We force `decodebin3` to upstream selection mode if *any* of the
1235          inputs is. This means things might break if there's a mix */
1236       if (input->upstream_selected)
1237         dbin->upstream_selected = TRUE;
1238
1239       /* Make sure group ids will be recalculated */
1240       input->group_id = GST_GROUP_ID_INVALID;
1241       recalculate_group_id (dbin);
1242       break;
1243     }
1244     case GST_EVENT_STREAM_COLLECTION:
1245     {
1246       GstStreamCollection *collection = NULL;
1247
1248       gst_event_parse_stream_collection (event, &collection);
1249       if (collection) {
1250         INPUT_LOCK (dbin);
1251         handle_stream_collection (dbin, collection, input);
1252         gst_object_unref (collection);
1253         INPUT_UNLOCK (dbin);
1254         SELECTION_LOCK (dbin);
1255         /* Post the (potentially) updated collection */
1256         if (dbin->collection) {
1257           GstMessage *msg;
1258           msg =
1259               gst_message_new_stream_collection ((GstObject *) dbin,
1260               dbin->collection);
1261           SELECTION_UNLOCK (dbin);
1262           gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
1263           update_requested_selection (dbin);
1264         } else
1265           SELECTION_UNLOCK (dbin);
1266       }
1267
1268       /* If we are waiting to create an identity passthrough, do it now */
1269       if (!input->parsebin && !input->identity)
1270         setup_identify_for_input (dbin, input, sinkpad);
1271       break;
1272     }
1273     case GST_EVENT_CAPS:
1274     {
1275       GstCaps *newcaps = NULL;
1276
1277       gst_event_parse_caps (event, &newcaps);
1278       if (!newcaps)
1279         break;
1280       GST_DEBUG_OBJECT (sinkpad, "new caps %" GST_PTR_FORMAT, newcaps);
1281
1282       /* No parsebin or identity present, check if we can avoid creating one */
1283       if (!input->parsebin && !input->identity) {
1284         if (is_parsebin_required_for_input (dbin, input, newcaps, sinkpad)) {
1285           GST_DEBUG_OBJECT (sinkpad, "parsebin is required for input");
1286           ensure_input_parsebin (dbin, input);
1287           break;
1288         }
1289         GST_DEBUG_OBJECT (sinkpad,
1290             "parsebin not required. Will create identity passthrough element once we get the collection");
1291         break;
1292       }
1293
1294       if (input->identity) {
1295         if (is_parsebin_required_for_input (dbin, input, newcaps, sinkpad)) {
1296           GST_ERROR_OBJECT (sinkpad,
1297               "Switching from passthrough to parsebin on inputs is not supported !");
1298           gst_event_unref (event);
1299           return FALSE;
1300         }
1301         /* Nothing else to do here */
1302         break;
1303       }
1304
1305       /* Check if the parsebin present can handle the new caps */
1306       g_assert (input->parsebin);
1307       GST_DEBUG_OBJECT (sinkpad,
1308           "New caps, checking if they are compatible with existing parsebin");
1309       if (!gst_pad_query_accept_caps (input->parsebin_sink, newcaps)) {
1310         GST_DEBUG_OBJECT (sinkpad,
1311             "Parsebin doesn't accept the new caps %" GST_PTR_FORMAT, newcaps);
1312         /* Reset parsebin so that it reconfigures itself for the new stream format */
1313         INPUT_LOCK (dbin);
1314         reset_input_parsebin (dbin, input);
1315         INPUT_UNLOCK (dbin);
1316       } else {
1317         GST_DEBUG_OBJECT (sinkpad, "Parsebin accepts new caps");
1318       }
1319       break;
1320     }
1321     default:
1322       break;
1323   }
1324
1325   /* Chain to parent function */
1326   return gst_pad_event_default (sinkpad, GST_OBJECT (dbin), event);
1327 }
1328
1329 /* Call with INPUT_LOCK taken */
1330 static DecodebinInput *
1331 create_new_input (GstDecodebin3 * dbin, gboolean main)
1332 {
1333   DecodebinInput *input;
1334
1335   input = g_new0 (DecodebinInput, 1);
1336   input->dbin = dbin;
1337   input->is_main = main;
1338   input->group_id = GST_GROUP_ID_INVALID;
1339   if (main)
1340     input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
1341   else {
1342     gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
1343     input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
1344     g_free (pad_name);
1345   }
1346   input->upstream_selected = FALSE;
1347   g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
1348   gst_pad_set_event_function (input->ghost_sink,
1349       (GstPadEventFunction) sink_event_function);
1350   gst_pad_set_query_function (input->ghost_sink,
1351       (GstPadQueryFunction) sink_query_function);
1352   gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
1353   g_signal_connect (input->ghost_sink, "unlinked",
1354       (GCallback) gst_decodebin3_input_pad_unlink, input);
1355
1356   gst_pad_set_active (input->ghost_sink, TRUE);
1357   gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
1358
1359   return input;
1360
1361 }
1362
1363 static GstPad *
1364 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
1365     const gchar * name, const GstCaps * caps)
1366 {
1367   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1368   DecodebinInput *input;
1369   GstPad *res = NULL;
1370
1371   /* We are ignoring names for the time being, not sure it makes any sense
1372    * within the context of decodebin3 ... */
1373   input = create_new_input (dbin, FALSE);
1374   if (input) {
1375     INPUT_LOCK (dbin);
1376     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1377     res = input->ghost_sink;
1378     INPUT_UNLOCK (dbin);
1379   }
1380
1381   return res;
1382 }
1383
1384 /* Must be called with factories lock! */
1385 static void
1386 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1387 {
1388   guint cookie;
1389
1390   cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1391   if (!dbin->factories || dbin->factories_cookie != cookie) {
1392     GList *tmp;
1393     if (dbin->factories)
1394       gst_plugin_feature_list_free (dbin->factories);
1395     if (dbin->decoder_factories)
1396       g_list_free (dbin->decoder_factories);
1397     if (dbin->decodable_factories)
1398       g_list_free (dbin->decodable_factories);
1399     dbin->factories =
1400         gst_element_factory_list_get_elements
1401         (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1402     dbin->factories =
1403         g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1404     dbin->factories_cookie = cookie;
1405
1406     /* Filter decoder and other decodables */
1407     dbin->decoder_factories = NULL;
1408     dbin->decodable_factories = NULL;
1409     for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1410       GstElementFactory *fact = (GstElementFactory *) tmp->data;
1411       if (gst_element_factory_list_is_type (fact,
1412               GST_ELEMENT_FACTORY_TYPE_DECODER))
1413         dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1414       else
1415         dbin->decodable_factories =
1416             g_list_append (dbin->decodable_factories, fact);
1417     }
1418   }
1419 }
1420
1421 /* Must be called with appropriate lock if list is a protected variable */
1422 static const gchar *
1423 stream_in_list (GList * list, const gchar * sid)
1424 {
1425   GList *tmp;
1426
1427 #if EXTRA_DEBUG
1428   for (tmp = list; tmp; tmp = tmp->next) {
1429     gchar *osid = (gchar *) tmp->data;
1430     GST_DEBUG ("Checking %s against %s", sid, osid);
1431   }
1432 #endif
1433
1434   for (tmp = list; tmp; tmp = tmp->next) {
1435     const gchar *osid = (gchar *) tmp->data;
1436     if (!g_strcmp0 (sid, osid))
1437       return osid;
1438   }
1439
1440   return NULL;
1441 }
1442
1443 static gboolean
1444 stream_list_equal (GList * lista, GList * listb)
1445 {
1446   GList *tmp;
1447
1448   if (g_list_length (lista) != g_list_length (listb))
1449     return FALSE;
1450
1451   for (tmp = lista; tmp; tmp = tmp->next) {
1452     gchar *osid = tmp->data;
1453     if (!stream_in_list (listb, osid))
1454       return FALSE;
1455   }
1456
1457   return TRUE;
1458 }
1459
1460 static void
1461 update_requested_selection (GstDecodebin3 * dbin)
1462 {
1463   guint i, nb;
1464   GList *tmp = NULL;
1465   gboolean all_user_selected = TRUE;
1466   GstStreamType used_types = 0;
1467   GstStreamCollection *collection;
1468
1469   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1470    *  the switch handler will take care of the pending selection */
1471   SELECTION_LOCK (dbin);
1472   if (dbin->pending_select_streams) {
1473     GST_DEBUG_OBJECT (dbin,
1474         "No need to create pending selection, SELECT_STREAMS underway");
1475     goto beach;
1476   }
1477
1478   collection = dbin->collection;
1479   if (G_UNLIKELY (collection == NULL)) {
1480     GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1481     goto beach;
1482   }
1483   nb = gst_stream_collection_get_size (collection);
1484
1485   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1486   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1487
1488   /* 3. If not, check if we already have some of the streams in the
1489    * existing active/requested selection */
1490   for (i = 0; i < nb; i++) {
1491     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1492     const gchar *sid = gst_stream_get_stream_id (stream);
1493     gint request = -1;
1494     /* Fire select-stream signal to see if outside components want to
1495      * hint at which streams should be selected */
1496     g_signal_emit (G_OBJECT (dbin),
1497         gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1498         &request);
1499     GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1500
1501     if (request == -1)
1502       all_user_selected = FALSE;
1503     if (request == 1 || (request == -1
1504             && (stream_in_list (dbin->requested_selection, sid)
1505                 || stream_in_list (dbin->active_selection, sid)))) {
1506       GstStreamType curtype = gst_stream_get_stream_type (stream);
1507       if (request == 1)
1508         GST_DEBUG_OBJECT (dbin,
1509             "Using stream requested by 'select-stream' signal : %s", sid);
1510       else
1511         GST_DEBUG_OBJECT (dbin,
1512             "Re-using stream already present in requested or active selection : %s",
1513             sid);
1514       tmp = g_list_append (tmp, (gchar *) sid);
1515       used_types |= curtype;
1516     }
1517   }
1518
1519   /* 4. If the user didn't explicitly selected all streams, match one stream of each type */
1520   if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) {
1521     for (i = 0; i < nb; i++) {
1522       GstStream *stream = gst_stream_collection_get_stream (collection, i);
1523       GstStreamType curtype = gst_stream_get_stream_type (stream);
1524       if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) {
1525         const gchar *sid = gst_stream_get_stream_id (stream);
1526         GST_DEBUG_OBJECT (dbin,
1527             "Automatically selecting stream '%s' of type %s", sid,
1528             gst_stream_type_get_name (curtype));
1529         tmp = g_list_append (tmp, (gchar *) sid);
1530         used_types |= curtype;
1531       }
1532     }
1533   }
1534
1535 beach:
1536   if (stream_list_equal (tmp, dbin->requested_selection)) {
1537     /* If the selection is equal, there is nothign to do */
1538     GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection");
1539     g_list_free (tmp);
1540     tmp = NULL;
1541   }
1542
1543   if (tmp) {
1544     /* Finally set the requested selection */
1545     if (dbin->requested_selection) {
1546       GST_FIXME_OBJECT (dbin,
1547           "Replacing non-NULL requested_selection, what should we do ??");
1548       g_list_free_full (dbin->requested_selection, g_free);
1549     }
1550     dbin->requested_selection =
1551         g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1552     dbin->selection_updated = TRUE;
1553     g_list_free (tmp);
1554   }
1555   SELECTION_UNLOCK (dbin);
1556 }
1557
1558 /* sort_streams:
1559  * GCompareFunc to use with lists of GstStream.
1560  * Sorts GstStreams by stream type and SELECT flag and stream-id
1561  * First video, then audio, then others.
1562  *
1563  * Return: negative if a<b, 0 if a==b, positive if a>b
1564  */
1565 static gint
1566 sort_streams (GstStream * sa, GstStream * sb)
1567 {
1568   GstStreamType typea, typeb;
1569   GstStreamFlags flaga, flagb;
1570   const gchar *ida, *idb;
1571   gint ret = 0;
1572
1573   typea = gst_stream_get_stream_type (sa);
1574   typeb = gst_stream_get_stream_type (sb);
1575
1576   GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1577       gst_stream_get_stream_id (sb));
1578
1579   /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1580   if (typea != typeb) {
1581     if (typea & GST_STREAM_TYPE_VIDEO)
1582       ret = -1;
1583     else if (typea & GST_STREAM_TYPE_AUDIO)
1584       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1585     else if (typea & GST_STREAM_TYPE_TEXT)
1586       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1587           && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1588     else if (typea & GST_STREAM_TYPE_CONTAINER)
1589       ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1590     else
1591       ret = 1;
1592
1593     if (ret != 0) {
1594       GST_LOG ("Sort by stream-type: %d", ret);
1595       return ret;
1596     }
1597   }
1598
1599   /* Sort by SELECT flag, if stream type is same. */
1600   flaga = gst_stream_get_stream_flags (sa);
1601   flagb = gst_stream_get_stream_flags (sb);
1602
1603   ret =
1604       (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1605       -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1606
1607   if (ret != 0) {
1608     GST_LOG ("Sort by SELECT flag: %d", ret);
1609     return ret;
1610   }
1611
1612   /* Sort by stream-id, if otherwise the same. */
1613   ida = gst_stream_get_stream_id (sa);
1614   idb = gst_stream_get_stream_id (sb);
1615   ret = g_strcmp0 (ida, idb);
1616
1617   GST_LOG ("Sort by stream-id: %d", ret);
1618
1619   return ret;
1620 }
1621
1622 /* Call with INPUT_LOCK taken */
1623 static GstStreamCollection *
1624 get_merged_collection (GstDecodebin3 * dbin)
1625 {
1626   gboolean needs_merge = FALSE;
1627   GstStreamCollection *res = NULL;
1628   GList *tmp;
1629   GList *unsorted_streams = NULL;
1630   guint i, nb_stream;
1631
1632   /* First check if we need to do a merge or just return the only collection */
1633   res = dbin->main_input->collection;
1634
1635   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1636     DecodebinInput *input = (DecodebinInput *) tmp->data;
1637     if (input->collection) {
1638       if (res) {
1639         needs_merge = TRUE;
1640         break;
1641       }
1642       res = input->collection;
1643     }
1644   }
1645
1646   if (!needs_merge) {
1647     GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1648     return res ? gst_object_ref (res) : NULL;
1649   }
1650
1651   /* We really need to create a new collection */
1652   /* FIXME : Some numbering scheme maybe ?? */
1653   res = gst_stream_collection_new ("decodebin3");
1654   if (dbin->main_input->collection) {
1655     nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1656     GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1657     for (i = 0; i < nb_stream; i++) {
1658       GstStream *stream =
1659           gst_stream_collection_get_stream (dbin->main_input->collection, i);
1660       unsorted_streams = g_list_append (unsorted_streams, stream);
1661     }
1662   }
1663
1664   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1665     DecodebinInput *input = (DecodebinInput *) tmp->data;
1666     GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1667         input->collection);
1668     if (input->collection) {
1669       nb_stream = gst_stream_collection_get_size (input->collection);
1670       GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1671       for (i = 0; i < nb_stream; i++) {
1672         GstStream *stream =
1673             gst_stream_collection_get_stream (input->collection, i);
1674         /* Only add if not already present in the list */
1675         if (!g_list_find (unsorted_streams, stream))
1676           unsorted_streams = g_list_append (unsorted_streams, stream);
1677       }
1678     }
1679   }
1680
1681   /* re-order streams : video, then audio, then others */
1682   unsorted_streams =
1683       g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1684   for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1685     GstStream *stream = (GstStream *) tmp->data;
1686     GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1687         gst_stream_get_stream_id (stream));
1688     gst_stream_collection_add_stream (res, gst_object_ref (stream));
1689   }
1690
1691   if (unsorted_streams)
1692     g_list_free (unsorted_streams);
1693
1694   return res;
1695 }
1696
1697 /* Call with INPUT_LOCK taken */
1698 static DecodebinInput *
1699 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1700 {
1701   DecodebinInput *input = NULL;
1702   GstElement *parent = gst_object_ref (child);
1703   GList *tmp;
1704
1705   do {
1706     GstElement *next_parent;
1707
1708     GST_DEBUG_OBJECT (dbin, "parent %s",
1709         parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1710
1711     if (parent == dbin->main_input->parsebin) {
1712       input = dbin->main_input;
1713       break;
1714     }
1715     for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1716       DecodebinInput *cur = (DecodebinInput *) tmp->data;
1717       if (parent == cur->parsebin) {
1718         input = cur;
1719         break;
1720       }
1721     }
1722     next_parent = (GstElement *) gst_element_get_parent (parent);
1723     gst_object_unref (parent);
1724     parent = next_parent;
1725
1726   } while (parent && parent != (GstElement *) dbin);
1727
1728   if (parent)
1729     gst_object_unref (parent);
1730
1731   return input;
1732 }
1733
1734 static const gchar *
1735 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1736 {
1737   guint i, len;
1738
1739   if (dbin->collection == NULL)
1740     return NULL;
1741   len = gst_stream_collection_get_size (dbin->collection);
1742   for (i = 0; i < len; i++) {
1743     GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1744     const gchar *osid = gst_stream_get_stream_id (stream);
1745     if (!g_strcmp0 (sid, osid))
1746       return osid;
1747   }
1748
1749   return NULL;
1750 }
1751
1752 /* Call with INPUT_LOCK taken */
1753 static void
1754 handle_stream_collection (GstDecodebin3 * dbin,
1755     GstStreamCollection * collection, DecodebinInput * input)
1756 {
1757 #ifndef GST_DISABLE_GST_DEBUG
1758   const gchar *upstream_id;
1759   guint i;
1760 #endif
1761   if (!input) {
1762     GST_DEBUG_OBJECT (dbin,
1763         "Couldn't find corresponding input, most likely shutting down");
1764     return;
1765   }
1766
1767   /* Replace collection in input */
1768   if (input->collection)
1769     gst_object_unref (input->collection);
1770   input->collection = gst_object_ref (collection);
1771   GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1772       input);
1773
1774   /* Merge collection if needed */
1775   collection = get_merged_collection (dbin);
1776
1777 #ifndef GST_DISABLE_GST_DEBUG
1778   /* Just some debugging */
1779   upstream_id = gst_stream_collection_get_upstream_id (collection);
1780   GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1781   GST_DEBUG ("From input %p", input);
1782   GST_DEBUG ("  %d streams", gst_stream_collection_get_size (collection));
1783   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1784     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1785     GstTagList *taglist;
1786     GstCaps *caps;
1787
1788     GST_DEBUG ("   Stream '%s'", gst_stream_get_stream_id (stream));
1789     GST_DEBUG ("     type  : %s",
1790         gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1791     GST_DEBUG ("     flags : 0x%x", gst_stream_get_stream_flags (stream));
1792     taglist = gst_stream_get_tags (stream);
1793     GST_DEBUG ("     tags  : %" GST_PTR_FORMAT, taglist);
1794     caps = gst_stream_get_caps (stream);
1795     GST_DEBUG ("     caps  : %" GST_PTR_FORMAT, caps);
1796     if (taglist)
1797       gst_tag_list_unref (taglist);
1798     if (caps)
1799       gst_caps_unref (caps);
1800   }
1801 #endif
1802
1803   /* Store collection for later usage */
1804   SELECTION_LOCK (dbin);
1805   if (dbin->collection == NULL) {
1806     dbin->collection = collection;
1807   } else {
1808     /* We need to check who emitted this collection (the owner).
1809      * If we already had a collection from that user, this one is an update,
1810      * that is to say that we need to figure out how we are going to re-use
1811      * the streams/slot */
1812     GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1813     /* FIXME : When do we switch from pending collection to active collection ?
1814      * When all streams from active collection are drained in multiqueue output ? */
1815     gst_object_unref (dbin->collection);
1816     dbin->collection = collection;
1817     /* dbin->pending_collection = */
1818     /*     g_list_append (dbin->pending_collection, collection); */
1819   }
1820   dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1821   SELECTION_UNLOCK (dbin);
1822 }
1823
1824 /* Must be called with the selection lock taken */
1825 static void
1826 gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
1827 {
1828   GstClockTime max_latency = GST_CLOCK_TIME_NONE;
1829   GList *tmp;
1830
1831   GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
1832   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1833     DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1834     if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
1835       if (max_latency == GST_CLOCK_TIME_NONE
1836           || out->decoder_latency > max_latency)
1837         max_latency = out->decoder_latency;
1838     }
1839   }
1840   GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
1841       GST_TIME_ARGS (max_latency));
1842
1843   if (!GST_CLOCK_TIME_IS_VALID (max_latency))
1844     return;
1845
1846   /* Make sure we keep an extra overhead */
1847   max_latency += 100 * GST_MSECOND;
1848   if (max_latency == dbin->current_mq_min_interleave)
1849     return;
1850
1851   dbin->current_mq_min_interleave = max_latency;
1852   GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
1853       GST_TIME_ARGS (dbin->current_mq_min_interleave));
1854   g_object_set (dbin->multiqueue, "min-interleave-time",
1855       dbin->current_mq_min_interleave, NULL);
1856 }
1857
1858 static void
1859 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1860 {
1861   GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1862   gboolean posting_collection = FALSE;
1863
1864   GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1865
1866   switch (GST_MESSAGE_TYPE (message)) {
1867     case GST_MESSAGE_STREAM_COLLECTION:
1868     {
1869       GstStreamCollection *collection = NULL;
1870       DecodebinInput *input;
1871
1872       INPUT_LOCK (dbin);
1873       input =
1874           find_message_parsebin (dbin,
1875           (GstElement *) GST_MESSAGE_SRC (message));
1876       if (input == NULL) {
1877         GST_DEBUG_OBJECT (dbin,
1878             "Couldn't find corresponding input, most likely shutting down");
1879         INPUT_UNLOCK (dbin);
1880         break;
1881       }
1882       if (input->upstream_selected) {
1883         GST_DEBUG_OBJECT (dbin,
1884             "Upstream handles selection, not using/forwarding collection");
1885         INPUT_UNLOCK (dbin);
1886         goto drop_message;
1887       }
1888       gst_message_parse_stream_collection (message, &collection);
1889       if (collection) {
1890         handle_stream_collection (dbin, collection, input);
1891         posting_collection = TRUE;
1892       }
1893       INPUT_UNLOCK (dbin);
1894
1895       SELECTION_LOCK (dbin);
1896       if (dbin->collection) {
1897         /* Replace collection message, we most likely aggregated it */
1898         GstMessage *new_msg;
1899         new_msg =
1900             gst_message_new_stream_collection ((GstObject *) dbin,
1901             dbin->collection);
1902         gst_message_unref (message);
1903         message = new_msg;
1904       }
1905       SELECTION_UNLOCK (dbin);
1906
1907       if (collection)
1908         gst_object_unref (collection);
1909       break;
1910     }
1911     case GST_MESSAGE_LATENCY:
1912     {
1913       GList *tmp;
1914       /* Check if this is from one of our decoders */
1915       SELECTION_LOCK (dbin);
1916       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1917         DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1918         if (out->decoder == (GstElement *) GST_MESSAGE_SRC (message)) {
1919           GstClockTime min, max;
1920           if (GST_IS_VIDEO_DECODER (out->decoder)) {
1921             gst_video_decoder_get_latency (GST_VIDEO_DECODER (out->decoder),
1922                 &min, &max);
1923             GST_DEBUG_OBJECT (dbin,
1924                 "Got latency update from one of our decoders. min: %"
1925                 GST_TIME_FORMAT " max: %" GST_TIME_FORMAT, GST_TIME_ARGS (min),
1926                 GST_TIME_ARGS (max));
1927             out->decoder_latency = min;
1928             /* Trigger recalculation */
1929             gst_decodebin3_update_min_interleave (dbin);
1930           }
1931           break;
1932         }
1933       }
1934       SELECTION_UNLOCK (dbin);
1935     }
1936     default:
1937       break;
1938   }
1939
1940   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1941
1942   if (posting_collection) {
1943     /* Figure out a selection for that collection */
1944     update_requested_selection (dbin);
1945   }
1946
1947   return;
1948
1949 drop_message:
1950   {
1951     GST_DEBUG_OBJECT (bin, "dropping message");
1952     gst_message_unref (message);
1953   }
1954 }
1955
1956 static DecodebinOutputStream *
1957 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1958 {
1959   GList *tmp;
1960   GstStreamType stype = gst_stream_get_stream_type (stream);
1961
1962   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1963     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1964     if (output->type == stype && output->slot && output->slot->active_stream) {
1965       GstStream *tstream = output->slot->active_stream;
1966       if (!stream_in_list (dbin->requested_selection,
1967               (gchar *) gst_stream_get_stream_id (tstream))) {
1968         return output;
1969       }
1970     }
1971   }
1972
1973   return NULL;
1974 }
1975
1976 /* Give a certain slot, figure out if it should be linked to an
1977  * output stream
1978  * CALL WITH SELECTION LOCK TAKEN !*/
1979 static DecodebinOutputStream *
1980 get_output_for_slot (MultiQueueSlot * slot)
1981 {
1982   GstDecodebin3 *dbin = slot->dbin;
1983   DecodebinOutputStream *output = NULL;
1984   const gchar *stream_id;
1985   GstCaps *caps;
1986   gchar *id_in_list = NULL;
1987
1988   /* If we already have a configured output, just use it */
1989   if (slot->output != NULL)
1990     return slot->output;
1991
1992   /*
1993    * FIXME
1994    *
1995    * This method needs to be split into multiple parts
1996    *
1997    * 1) Figure out whether stream should be exposed or not
1998    *   This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1999    *   in the default stream attribution
2000    *
2001    * 2) Figure out whether an output stream should be created, whether
2002    *   we can re-use the output stream already linked to the slot, or
2003    *   whether we need to get re-assigned another (currently used) output
2004    *   stream.
2005    */
2006
2007   stream_id = gst_stream_get_stream_id (slot->active_stream);
2008   caps = gst_stream_get_caps (slot->active_stream);
2009   GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
2010   gst_caps_unref (caps);
2011
2012   /* 0. Emit autoplug-continue signal for pending caps ? */
2013   GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
2014
2015   /* 1. if in EXPOSE_ALL_MODE, just accept */
2016   GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
2017
2018 #if 0
2019   /* FIXME : The idea around this was to avoid activating a stream for
2020    *     which we have no decoder. Unfortunately it is way too
2021    *     expensive. Need to figure out a better solution */
2022   /* 2. Is there a potential decoder (if one is required) */
2023   if (!gst_caps_can_intersect (caps, dbin->caps)
2024       && !have_factory (dbin, (GstCaps *) caps,
2025           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
2026     GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
2027         caps);
2028     SELECTION_UNLOCK (dbin);
2029     gst_element_post_message (GST_ELEMENT_CAST (dbin),
2030         gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2031     SELECTION_LOCK (dbin);
2032     return NULL;
2033   }
2034 #endif
2035
2036   /* 3. In default mode check if we should expose */
2037   id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
2038   if (id_in_list || dbin->upstream_selected) {
2039     /* Check if we can steal an existing output stream we could re-use.
2040      * that is:
2041      * * an output stream whose slot->stream is not in requested
2042      * * and is of the same type as this stream
2043      */
2044     output = find_free_compatible_output (dbin, slot->active_stream);
2045     if (output) {
2046       /* Move this output from its current slot to this slot */
2047       dbin->to_activate =
2048           g_list_append (dbin->to_activate, (gchar *) stream_id);
2049       dbin->requested_selection =
2050           g_list_remove (dbin->requested_selection, id_in_list);
2051       g_free (id_in_list);
2052       SELECTION_UNLOCK (dbin);
2053       gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2054           (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
2055       SELECTION_LOCK (dbin);
2056       return NULL;
2057     }
2058
2059     output = create_output_stream (dbin, slot->type);
2060     output->slot = slot;
2061     GST_DEBUG ("Linking slot %p to new output %p", slot, output);
2062     slot->output = output;
2063     GST_DEBUG ("Adding '%s' to active_selection", stream_id);
2064     dbin->active_selection =
2065         g_list_append (dbin->active_selection, (gchar *) g_strdup (stream_id));
2066   } else
2067     GST_DEBUG ("Not creating any output for slot %p", slot);
2068
2069   return output;
2070 }
2071
2072 /* Returns SELECTED_STREAMS message if active_selection is equal to
2073  * requested_selection, else NULL.
2074  * Must be called with LOCK taken */
2075 static GstMessage *
2076 is_selection_done (GstDecodebin3 * dbin)
2077 {
2078   GList *tmp;
2079   GstMessage *msg;
2080
2081   if (!dbin->selection_updated)
2082     return NULL;
2083
2084   GST_LOG_OBJECT (dbin, "Checking");
2085
2086   if (dbin->to_activate != NULL) {
2087     GST_DEBUG ("Still have streams to activate");
2088     return NULL;
2089   }
2090   for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
2091     GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
2092     if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
2093       GST_DEBUG ("Not in active selection, returning");
2094       return NULL;
2095     }
2096   }
2097
2098   GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
2099
2100   /* We are completely active */
2101   msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
2102   if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) {
2103     gst_message_set_seqnum (msg, dbin->select_streams_seqnum);
2104   }
2105   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2106     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2107     if (output->slot) {
2108       GST_DEBUG_OBJECT (dbin, "Adding stream %s",
2109           gst_stream_get_stream_id (output->slot->active_stream));
2110
2111       gst_message_streams_selected_add (msg, output->slot->active_stream);
2112     } else
2113       GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
2114   }
2115   dbin->selection_updated = FALSE;
2116   return msg;
2117 }
2118
2119 /* Must be called with SELECTION_LOCK taken */
2120 static void
2121 check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev)
2122 {
2123   gboolean all_drained = TRUE;
2124   GList *iter;
2125
2126   GST_DEBUG_OBJECT (dbin, "check slot for eos");
2127
2128   for (iter = dbin->slots; iter; iter = iter->next) {
2129     MultiQueueSlot *slot = iter->data;
2130
2131     if (!slot->output)
2132       continue;
2133
2134     if (slot->is_drained) {
2135       GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
2136       continue;
2137     }
2138
2139     all_drained = FALSE;
2140     break;
2141   }
2142
2143   /* Also check with the inputs, data might be pending */
2144   if (all_drained)
2145     all_drained = all_inputs_are_eos (dbin);
2146
2147   if (all_drained) {
2148     GST_DEBUG_OBJECT (dbin,
2149         "All active slots are drained, and no pending input, push EOS");
2150
2151     for (iter = dbin->input_streams; iter; iter = iter->next) {
2152       DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
2153       GstPad *peer = gst_pad_get_peer (input->srcpad);
2154
2155       /* Send EOS to all slots */
2156       if (peer) {
2157         GstEvent *stream_start, *eos;
2158
2159         stream_start =
2160             gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
2161
2162         /* First forward a custom STREAM_START event to reset the EOS status (if any) */
2163         if (stream_start) {
2164           GstStructure *s;
2165           GstEvent *custom_stream_start = gst_event_copy (stream_start);
2166           gst_event_unref (stream_start);
2167           s = (GstStructure *) gst_event_get_structure (custom_stream_start);
2168           gst_structure_set (s, "decodebin3-flushing-stream-start",
2169               G_TYPE_BOOLEAN, TRUE, NULL);
2170           gst_pad_send_event (peer, custom_stream_start);
2171         }
2172
2173         eos = gst_event_new_eos ();
2174         gst_event_set_seqnum (eos, gst_event_get_seqnum (ev));
2175         gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
2176             CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
2177             NULL);
2178         gst_pad_send_event (peer, eos);
2179         gst_object_unref (peer);
2180       } else
2181         GST_DEBUG_OBJECT (dbin, "no output");
2182     }
2183   }
2184 }
2185
2186 static GstPadProbeReturn
2187 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
2188     MultiQueueSlot * slot)
2189 {
2190   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2191   GstDecodebin3 *dbin = slot->dbin;
2192
2193   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
2194     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2195
2196     GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
2197     switch (GST_EVENT_TYPE (ev)) {
2198       case GST_EVENT_STREAM_START:
2199       {
2200         GstStream *stream = NULL;
2201         const GstStructure *s = gst_event_get_structure (ev);
2202
2203         /* Drop STREAM_START events used to cleanup multiqueue */
2204         if (s
2205             && gst_structure_has_field (s,
2206                 "decodebin3-flushing-stream-start")) {
2207           ret = GST_PAD_PROBE_HANDLED;
2208           gst_event_unref (ev);
2209           break;
2210         }
2211
2212         gst_event_parse_stream (ev, &stream);
2213         if (stream == NULL) {
2214           GST_ERROR_OBJECT (pad,
2215               "Got a STREAM_START event without a GstStream");
2216           break;
2217         }
2218         slot->is_drained = FALSE;
2219         GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
2220             gst_stream_get_stream_id (stream));
2221         if (slot->active_stream == NULL) {
2222           slot->active_stream = stream;
2223         } else if (slot->active_stream != stream) {
2224           GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
2225               gst_stream_get_stream_id (slot->active_stream),
2226               gst_stream_get_stream_id (stream));
2227           gst_object_unref (slot->active_stream);
2228           slot->active_stream = stream;
2229         } else
2230           gst_object_unref (stream);
2231 #if 0                           /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
2232         {
2233           gboolean is_active, is_requested;
2234           /* Quick check to see if we're in the current selection */
2235           /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
2236           SELECTION_LOCK (dbin);
2237           GST_DEBUG_OBJECT (dbin, "Checking active selection");
2238           is_active = stream_in_list (dbin->active_selection, stream_id);
2239           GST_DEBUG_OBJECT (dbin, "Checking requested selection");
2240           is_requested = stream_in_list (dbin->requested_selection, stream_id);
2241           SELECTION_UNLOCK (dbin);
2242           if (is_active)
2243             GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
2244                 slot->output);
2245           if (is_requested)
2246             GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
2247                 slot->output);
2248           else if (slot->output) {
2249             GST_DEBUG_OBJECT (pad,
2250                 "Slot needs to be deactivated ? It's no longer in requested selection");
2251           } else if (!is_active)
2252             GST_DEBUG_OBJECT (pad,
2253                 "Slot in neither active nor requested selection");
2254         }
2255 #endif
2256       }
2257         break;
2258       case GST_EVENT_CAPS:
2259       {
2260         /* Configure the output slot if needed */
2261         DecodebinOutputStream *output;
2262         GstMessage *msg = NULL;
2263         SELECTION_LOCK (dbin);
2264         output = get_output_for_slot (slot);
2265         if (output) {
2266           reconfigure_output_stream (output, slot);
2267           msg = is_selection_done (dbin);
2268         }
2269         SELECTION_UNLOCK (dbin);
2270         if (msg)
2271           gst_element_post_message ((GstElement *) slot->dbin, msg);
2272       }
2273         break;
2274       case GST_EVENT_EOS:
2275       {
2276         gboolean was_drained = slot->is_drained;
2277         slot->is_drained = TRUE;
2278
2279         /* Custom EOS handling first */
2280         if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2281                 CUSTOM_EOS_QUARK)) {
2282           /* remove custom-eos */
2283           ev = gst_event_make_writable (ev);
2284           GST_PAD_PROBE_INFO_DATA (info) = ev;
2285           gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
2286               CUSTOM_EOS_QUARK, NULL, NULL);
2287
2288           GST_LOG_OBJECT (pad, "Received custom EOS");
2289           ret = GST_PAD_PROBE_HANDLED;
2290           SELECTION_LOCK (dbin);
2291           if (slot->input == NULL) {
2292             GST_DEBUG_OBJECT (pad,
2293                 "Got custom-eos from null input stream, remove output stream");
2294             /* Remove the output */
2295             if (slot->output) {
2296               DecodebinOutputStream *output = slot->output;
2297               dbin->output_streams =
2298                   g_list_remove (dbin->output_streams, output);
2299               free_output_stream (dbin, output);
2300               /* Reacalculate min interleave */
2301               gst_decodebin3_update_min_interleave (dbin);
2302             }
2303             slot->probe_id = 0;
2304             dbin->slots = g_list_remove (dbin->slots, slot);
2305             free_multiqueue_slot_async (dbin, slot);
2306             ret = GST_PAD_PROBE_REMOVE;
2307           } else if (!was_drained) {
2308             check_all_slot_for_eos (dbin, ev);
2309           }
2310           if (ret == GST_PAD_PROBE_HANDLED)
2311             gst_event_unref (ev);
2312           SELECTION_UNLOCK (dbin);
2313           break;
2314         }
2315
2316         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
2317             slot->input);
2318         if (slot->input == NULL) {
2319           GstPad *peer;
2320           GST_DEBUG_OBJECT (pad,
2321               "last EOS for input, forwarding and removing slot");
2322           peer = gst_pad_get_peer (pad);
2323           if (peer) {
2324             gst_pad_send_event (peer, gst_event_ref (ev));
2325             gst_object_unref (peer);
2326           }
2327           SELECTION_LOCK (dbin);
2328           /* FIXME : Shouldn't we try to re-assign the output instead of just
2329            * removing it ? */
2330           /* Remove the output */
2331           if (slot->output) {
2332             DecodebinOutputStream *output = slot->output;
2333             dbin->output_streams = g_list_remove (dbin->output_streams, output);
2334             free_output_stream (dbin, output);
2335           }
2336           slot->probe_id = 0;
2337           dbin->slots = g_list_remove (dbin->slots, slot);
2338           SELECTION_UNLOCK (dbin);
2339
2340           /* FIXME: Removing the slot is async, which means actually
2341            * unlinking the pad is async. Other things like stream-start
2342            * might flow through this (now unprobed) link before it actually
2343            * gets released */
2344           free_multiqueue_slot_async (dbin, slot);
2345           ret = GST_PAD_PROBE_REMOVE;
2346         } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2347                 CUSTOM_FINAL_EOS_QUARK)) {
2348           GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
2349         } else {
2350           GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
2351           /* drop current event as eos will be sent in check_all_slot_for_eos
2352            * when all output streams are also eos */
2353           ret = GST_PAD_PROBE_DROP;
2354           SELECTION_LOCK (dbin);
2355           check_all_slot_for_eos (dbin, ev);
2356           SELECTION_UNLOCK (dbin);
2357         }
2358       }
2359         break;
2360       default:
2361         break;
2362     }
2363   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
2364     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
2365     switch (GST_QUERY_TYPE (query)) {
2366       case GST_QUERY_CAPS:
2367       {
2368         GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
2369         gst_query_set_caps_result (query, GST_CAPS_ANY);
2370         ret = GST_PAD_PROBE_HANDLED;
2371       }
2372         break;
2373
2374       case GST_QUERY_ACCEPT_CAPS:
2375       {
2376         GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
2377         /* If the current decoder doesn't accept caps, we'll reconfigure
2378          * on the actual caps event. So accept any caps. */
2379         gst_query_set_accept_caps_result (query, TRUE);
2380         ret = GST_PAD_PROBE_HANDLED;
2381       }
2382       default:
2383         break;
2384     }
2385   }
2386
2387   return ret;
2388 }
2389
2390 /* Create a new multiqueue slot for the given type
2391  *
2392  * It is up to the caller to know whether that slot is needed or not
2393  * (and release it when no longer needed) */
2394 static MultiQueueSlot *
2395 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
2396 {
2397   MultiQueueSlot *slot;
2398   GstIterator *it = NULL;
2399   GValue item = { 0, };
2400
2401   GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
2402       gst_stream_type_get_name (type));
2403   slot = g_new0 (MultiQueueSlot, 1);
2404   slot->dbin = dbin;
2405
2406   slot->id = dbin->slot_id++;
2407
2408   slot->type = type;
2409   slot->sink_pad = gst_element_request_pad_simple (dbin->multiqueue, "sink_%u");
2410   if (slot->sink_pad == NULL)
2411     goto fail;
2412
2413   it = gst_pad_iterate_internal_links (slot->sink_pad);
2414   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
2415       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
2416     GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
2417         GST_DEBUG_PAD_NAME (slot->src_pad));
2418     goto fail;
2419   }
2420   gst_iterator_free (it);
2421   g_value_reset (&item);
2422
2423   g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
2424
2425   /* Add event probe */
2426   slot->probe_id =
2427       gst_pad_add_probe (slot->src_pad,
2428       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2429       (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
2430
2431   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
2432       GST_DEBUG_PAD_NAME (slot->src_pad));
2433
2434   dbin->slots = g_list_append (dbin->slots, slot);
2435
2436   return slot;
2437
2438   /* ERRORS */
2439 fail:
2440   {
2441     if (slot->sink_pad)
2442       gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2443     g_free (slot);
2444     return NULL;
2445   }
2446 }
2447
2448 /* Must be called with SELECTION_LOCK */
2449 static MultiQueueSlot *
2450 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
2451 {
2452   GList *tmp;
2453   MultiQueueSlot *empty_slot = NULL;
2454   GstStreamType input_type = 0;
2455   gchar *stream_id = NULL;
2456
2457   GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
2458       input, input->active_stream,
2459       input->
2460       active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
2461
2462   if (input->active_stream) {
2463     input_type = gst_stream_get_stream_type (input->active_stream);
2464     stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
2465   }
2466
2467   /* Go over existing slots and check if there is already one for it */
2468   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2469     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2470     /* Already used input, return that one */
2471     if (slot->input == input) {
2472       GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
2473       return slot;
2474     }
2475   }
2476
2477   /* Go amongst all unused slots of the right type and try to find a candidate */
2478   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2479     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2480     if (slot->input == NULL && input_type == slot->type) {
2481       /* Remember this empty slot for later */
2482       empty_slot = slot;
2483       /* Check if available slot is of the same stream_id */
2484       GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2485           slot->id, slot->active_stream);
2486       if (stream_id && slot->active_stream) {
2487         gchar *ostream_id =
2488             (gchar *) gst_stream_get_stream_id (slot->active_stream);
2489         GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2490             ostream_id, stream_id);
2491         if (!g_strcmp0 (stream_id, ostream_id))
2492           break;
2493       }
2494     }
2495   }
2496
2497   if (empty_slot) {
2498     GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2499     empty_slot->input = input;
2500     return empty_slot;
2501   }
2502
2503   if (input_type)
2504     return create_new_slot (dbin, input_type);
2505
2506   return NULL;
2507 }
2508
2509 static void
2510 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2511 {
2512   if (slot->input != NULL && slot->input != input) {
2513     GST_ERROR_OBJECT (slot->dbin,
2514         "Trying to link input to an already used slot");
2515     return;
2516   }
2517   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2518   slot->pending_stream = input->active_stream;
2519   slot->input = input;
2520 }
2521
2522 #if 0
2523 static gboolean
2524 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2525     GstElementFactoryListType ftype)
2526 {
2527   gboolean ret = FALSE;
2528   GList *res;
2529
2530   g_mutex_lock (&dbin->factories_lock);
2531   gst_decode_bin_update_factories_list (dbin);
2532   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2533     res =
2534         gst_element_factory_list_filter (dbin->decoder_factories,
2535         caps, GST_PAD_SINK, TRUE);
2536   else
2537     res =
2538         gst_element_factory_list_filter (dbin->decodable_factories,
2539         caps, GST_PAD_SINK, TRUE);
2540   g_mutex_unlock (&dbin->factories_lock);
2541
2542   if (res) {
2543     ret = TRUE;
2544     gst_plugin_feature_list_free (res);
2545   }
2546
2547   return ret;
2548 }
2549 #endif
2550
2551 static GList *
2552 create_decoder_factory_list (GstDecodebin3 * dbin, GstCaps * caps)
2553 {
2554   GList *res;
2555
2556   g_mutex_lock (&dbin->factories_lock);
2557   gst_decode_bin_update_factories_list (dbin);
2558   res = gst_element_factory_list_filter (dbin->decoder_factories,
2559       caps, GST_PAD_SINK, TRUE);
2560   g_mutex_unlock (&dbin->factories_lock);
2561   return res;
2562 }
2563
2564 static GstPadProbeReturn
2565 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2566     DecodebinOutputStream * output)
2567 {
2568   GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2569   /* If we have a keyframe, remove the probe and let all data through */
2570   /* FIXME : HANDLE HEADER BUFFER ?? */
2571   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2572       GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2573     GST_DEBUG_OBJECT (pad,
2574         "Buffer is keyframe or header, letting through and removing probe");
2575     output->drop_probe_id = 0;
2576     return GST_PAD_PROBE_REMOVE;
2577   }
2578   GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2579   return GST_PAD_PROBE_DROP;
2580 }
2581
2582 static void
2583 reconfigure_output_stream (DecodebinOutputStream * output,
2584     MultiQueueSlot * slot)
2585 {
2586   GstDecodebin3 *dbin = output->dbin;
2587   GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2588   gboolean needs_decoder;
2589
2590   needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2591
2592   GST_DEBUG_OBJECT (dbin,
2593       "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2594       needs_decoder);
2595
2596   /* FIXME : Maybe make the output un-hook itself automatically ? */
2597   if (output->slot != NULL && output->slot != slot) {
2598     GST_WARNING_OBJECT (dbin,
2599         "Output still linked to another slot (%p)", output->slot);
2600     gst_caps_unref (new_caps);
2601     return;
2602   }
2603
2604   /* Check if existing config is reusable as-is by checking if
2605    * the existing decoder accepts the new caps, if not delete
2606    * it and create a new one */
2607   if (output->decoder) {
2608     gboolean can_reuse_decoder;
2609
2610     if (needs_decoder) {
2611       can_reuse_decoder =
2612           gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2613     } else
2614       can_reuse_decoder = FALSE;
2615
2616     if (can_reuse_decoder) {
2617       if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2618         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2619         output->drop_probe_id =
2620             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2621             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2622       }
2623       GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2624       if (output->linked == FALSE) {
2625         gst_pad_link_full (slot->src_pad, output->decoder_sink,
2626             GST_PAD_LINK_CHECK_NOTHING);
2627         output->linked = TRUE;
2628       }
2629       gst_caps_unref (new_caps);
2630       return;
2631     }
2632
2633     GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2634
2635     if (output->linked)
2636       gst_pad_unlink (slot->src_pad, output->decoder_sink);
2637     output->linked = FALSE;
2638     if (output->drop_probe_id) {
2639       gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2640       output->drop_probe_id = 0;
2641     }
2642
2643     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2644       GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2645       gst_caps_unref (new_caps);
2646       goto cleanup;
2647     }
2648
2649     gst_element_set_locked_state (output->decoder, TRUE);
2650     gst_element_set_state (output->decoder, GST_STATE_NULL);
2651
2652     gst_bin_remove ((GstBin *) dbin, output->decoder);
2653     output->decoder = NULL;
2654     output->decoder_latency = GST_CLOCK_TIME_NONE;
2655   } else if (output->linked) {
2656     /* Otherwise if we have no decoder yet but the output is linked make
2657      * sure that the ghost pad is really unlinked in case no decoder was
2658      * needed previously */
2659     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2660       GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
2661       gst_caps_unref (new_caps);
2662       goto cleanup;
2663     }
2664   }
2665
2666   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2667   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2668
2669   /* If a decoder is required, create one */
2670   if (needs_decoder) {
2671     GList *factories, *next_factory;
2672
2673     factories = next_factory = create_decoder_factory_list (dbin, new_caps);
2674     while (!output->decoder) {
2675       gboolean decoder_failed = FALSE;
2676
2677       /* If we don't have a decoder yet, instantiate one */
2678       if (next_factory) {
2679         output->decoder = gst_element_factory_create ((GstElementFactory *)
2680             next_factory->data, NULL);
2681         GST_DEBUG ("Created decoder '%s'", GST_ELEMENT_NAME (output->decoder));
2682       } else
2683         GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
2684             new_caps);
2685
2686       if (output->decoder == NULL) {
2687         GstCaps *caps;
2688
2689         SELECTION_UNLOCK (dbin);
2690         /* FIXME : Should we be smarter if there's a missing decoder ?
2691          * Should we deactivate that stream ? */
2692         caps = gst_stream_get_caps (slot->active_stream);
2693         gst_element_post_message (GST_ELEMENT_CAST (dbin),
2694             gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2695         gst_caps_unref (caps);
2696         SELECTION_LOCK (dbin);
2697         goto cleanup;
2698       }
2699       if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2700         GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2701         goto cleanup;
2702       }
2703       output->decoder_sink =
2704           gst_element_get_static_pad (output->decoder, "sink");
2705       output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2706       if (output->type & GST_STREAM_TYPE_VIDEO) {
2707         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2708         output->drop_probe_id =
2709             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2710             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2711       }
2712       if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2713               GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2714         GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2715             GST_DEBUG_PAD_NAME (output->decoder_sink));
2716         goto cleanup;
2717       }
2718       if (gst_element_set_state (output->decoder,
2719               GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
2720         GST_DEBUG_OBJECT (dbin,
2721             "Decoder '%s' failed to reach READY state, trying the next type",
2722             GST_ELEMENT_NAME (output->decoder));
2723         decoder_failed = TRUE;
2724       }
2725       if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
2726         GST_DEBUG_OBJECT (dbin,
2727             "Decoder '%s' did not accept the caps, trying the next type",
2728             GST_ELEMENT_NAME (output->decoder));
2729         decoder_failed = TRUE;
2730       }
2731       if (decoder_failed) {
2732         gst_pad_unlink (slot->src_pad, output->decoder_sink);
2733         if (output->drop_probe_id) {
2734           gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2735           output->drop_probe_id = 0;
2736         }
2737
2738         gst_element_set_locked_state (output->decoder, TRUE);
2739         gst_element_set_state (output->decoder, GST_STATE_NULL);
2740
2741         gst_bin_remove ((GstBin *) dbin, output->decoder);
2742         output->decoder = NULL;
2743       }
2744       next_factory = next_factory->next;
2745     }
2746     gst_plugin_feature_list_free (factories);
2747   } else {
2748     output->decoder_src = gst_object_ref (slot->src_pad);
2749     output->decoder_sink = NULL;
2750   }
2751   gst_caps_unref (new_caps);
2752
2753   output->linked = TRUE;
2754   if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2755           output->decoder_src)) {
2756     GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2757     goto cleanup;
2758   }
2759   if (output->src_exposed == FALSE) {
2760     GstEvent *stream_start;
2761
2762     stream_start = gst_pad_get_sticky_event (slot->src_pad,
2763         GST_EVENT_STREAM_START, 0);
2764
2765     /* Ensure GstStream is accesiable from pad-added callback */
2766     if (stream_start) {
2767       gst_pad_store_sticky_event (output->src_pad, stream_start);
2768       gst_event_unref (stream_start);
2769     } else {
2770       GST_WARNING_OBJECT (slot->src_pad,
2771           "Pad has no stored stream-start event");
2772     }
2773
2774     output->src_exposed = TRUE;
2775     gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2776   }
2777
2778   if (output->decoder)
2779     gst_element_sync_state_with_parent (output->decoder);
2780
2781   output->slot = slot;
2782   return;
2783
2784 cleanup:
2785   {
2786     GST_DEBUG_OBJECT (dbin, "Cleanup");
2787     if (output->decoder_sink) {
2788       gst_object_unref (output->decoder_sink);
2789       output->decoder_sink = NULL;
2790     }
2791     if (output->decoder_src) {
2792       gst_object_unref (output->decoder_src);
2793       output->decoder_src = NULL;
2794     }
2795     if (output->decoder) {
2796       gst_element_set_state (output->decoder, GST_STATE_NULL);
2797       gst_bin_remove ((GstBin *) dbin, output->decoder);
2798       output->decoder = NULL;
2799     }
2800   }
2801 }
2802
2803 static GstPadProbeReturn
2804 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2805 {
2806   GstMessage *msg = NULL;
2807   DecodebinOutputStream *output;
2808
2809   SELECTION_LOCK (slot->dbin);
2810   output = get_output_for_slot (slot);
2811
2812   GST_DEBUG_OBJECT (pad, "output : %p", output);
2813
2814   if (output) {
2815     reconfigure_output_stream (output, slot);
2816     msg = is_selection_done (slot->dbin);
2817   }
2818   SELECTION_UNLOCK (slot->dbin);
2819   if (msg)
2820     gst_element_post_message ((GstElement *) slot->dbin, msg);
2821
2822   return GST_PAD_PROBE_REMOVE;
2823 }
2824
2825 static MultiQueueSlot *
2826 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2827 {
2828   GList *tmp;
2829
2830   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2831     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2832     const gchar *stream_id;
2833     if (slot->active_stream) {
2834       stream_id = gst_stream_get_stream_id (slot->active_stream);
2835       if (!g_strcmp0 (sid, stream_id))
2836         return slot;
2837     }
2838     if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2839       stream_id = gst_stream_get_stream_id (slot->pending_stream);
2840       if (!g_strcmp0 (sid, stream_id))
2841         return slot;
2842     }
2843   }
2844
2845   return NULL;
2846 }
2847
2848 /* This function handles the reassignment of a slot. Call this from
2849  * the streaming thread of a slot. */
2850 static gboolean
2851 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2852 {
2853   DecodebinOutputStream *output;
2854   MultiQueueSlot *target_slot = NULL;
2855   GList *tmp;
2856   const gchar *sid, *tsid;
2857
2858   SELECTION_LOCK (dbin);
2859   output = slot->output;
2860
2861   if (G_UNLIKELY (slot->active_stream == NULL)) {
2862     GST_DEBUG_OBJECT (slot->src_pad,
2863         "Called on inactive slot (active_stream == NULL)");
2864     SELECTION_UNLOCK (dbin);
2865     return FALSE;
2866   }
2867
2868   if (G_UNLIKELY (output == NULL)) {
2869     GST_DEBUG_OBJECT (slot->src_pad,
2870         "Slot doesn't have any output to be removed");
2871     SELECTION_UNLOCK (dbin);
2872     return FALSE;
2873   }
2874
2875   sid = gst_stream_get_stream_id (slot->active_stream);
2876   GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2877
2878   /* Recheck whether this stream is still in the list of streams to deactivate */
2879   if (stream_in_list (dbin->requested_selection, sid)) {
2880     /* Stream is in the list of requested streams, don't remove */
2881     SELECTION_UNLOCK (dbin);
2882     GST_DEBUG_OBJECT (slot->src_pad,
2883         "Stream '%s' doesn't need to be deactivated", sid);
2884     return FALSE;
2885   }
2886
2887   /* Unlink slot from output */
2888   /* FIXME : Handle flushing ? */
2889   /* FIXME : Handle outputs without decoders */
2890   GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2891       output->decoder_sink);
2892   if (output->decoder_sink)
2893     gst_pad_unlink (slot->src_pad, output->decoder_sink);
2894   output->linked = FALSE;
2895   slot->output = NULL;
2896   output->slot = NULL;
2897   /* Remove sid from active selection */
2898   GST_DEBUG ("Removing '%s' from active_selection", sid);
2899   for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2900     if (!g_strcmp0 (sid, tmp->data)) {
2901       g_free (tmp->data);
2902       dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2903       break;
2904     }
2905
2906   /* Can we re-assign this output to a requested stream ? */
2907   GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2908   for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2909     MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2910     GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2911         tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2912     if (tslot && tslot->type == output->type && tslot->output == NULL) {
2913       GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2914       target_slot = tslot;
2915       tsid = tmp->data;
2916       /* Pass target stream id to requested selection */
2917       dbin->requested_selection =
2918           g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2919       dbin->to_activate = g_list_delete_link (dbin->to_activate, tmp);
2920       break;
2921     }
2922   }
2923
2924   if (target_slot) {
2925     GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2926         target_slot, tsid);
2927     target_slot->output = output;
2928     output->slot = target_slot;
2929     GST_DEBUG ("Adding '%s' to active_selection", tsid);
2930     dbin->active_selection =
2931         g_list_append (dbin->active_selection, (gchar *) g_strdup (tsid));
2932     SELECTION_UNLOCK (dbin);
2933
2934     /* Wakeup the target slot so that it retries to send events/buffers
2935      * thereby triggering the output reconfiguration codepath */
2936     gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2937         (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2938     /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2939   } else {
2940     GstMessage *msg;
2941
2942     dbin->output_streams = g_list_remove (dbin->output_streams, output);
2943     free_output_stream (dbin, output);
2944     msg = is_selection_done (slot->dbin);
2945     SELECTION_UNLOCK (dbin);
2946
2947     if (msg)
2948       gst_element_post_message ((GstElement *) slot->dbin, msg);
2949   }
2950
2951   return TRUE;
2952 }
2953
2954 /* Idle probe called when a slot should be unassigned from its output stream.
2955  * This is needed to ensure nothing is flowing when unlinking the slot.
2956  *
2957  * Also, this method will search for a pending stream which could re-use
2958  * the output stream. */
2959 static GstPadProbeReturn
2960 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2961     MultiQueueSlot * slot)
2962 {
2963   GstDecodebin3 *dbin = slot->dbin;
2964
2965   reassign_slot (dbin, slot);
2966
2967   return GST_PAD_PROBE_REMOVE;
2968 }
2969
2970 static gboolean
2971 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2972     guint32 seqnum)
2973 {
2974   gboolean ret = TRUE;
2975   GList *tmp;
2976   /* List of slots to (de)activate. */
2977   GList *to_deactivate = NULL;
2978   GList *to_activate = NULL;
2979   /* List of unknown stream id, most likely means the event
2980    * should be sent upstream so that elements can expose the requested stream */
2981   GList *unknown = NULL;
2982   GList *to_reassign = NULL;
2983   GList *future_request_streams = NULL;
2984   GList *pending_streams = NULL;
2985   GList *slots_to_reassign = NULL;
2986
2987   SELECTION_LOCK (dbin);
2988   if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2989     GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2990     SELECTION_UNLOCK (dbin);
2991     return TRUE;
2992   }
2993   /* Remove pending select_streams */
2994   g_list_free (dbin->pending_select_streams);
2995   dbin->pending_select_streams = NULL;
2996
2997   /* COMPARE the requested streams to the active and requested streams
2998    * on multiqueue. */
2999
3000   /* First check the slots to activate and which ones are unknown */
3001   for (tmp = select_streams; tmp; tmp = tmp->next) {
3002     const gchar *sid = (const gchar *) tmp->data;
3003     MultiQueueSlot *slot;
3004     GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
3005     slot = find_slot_for_stream_id (dbin, sid);
3006     /* Find the corresponding slot */
3007     if (slot == NULL) {
3008       if (stream_in_collection (dbin, (gchar *) sid)) {
3009         pending_streams = g_list_append (pending_streams, (gchar *) sid);
3010       } else {
3011         GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
3012         unknown = g_list_append (unknown, (gchar *) sid);
3013       }
3014     } else if (slot->output == NULL) {
3015       GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
3016           slot, sid);
3017       to_activate = g_list_append (to_activate, slot);
3018     } else {
3019       GST_DEBUG_OBJECT (dbin,
3020           "Stream '%s' from slot %p is already active on output %p", sid, slot,
3021           slot->output);
3022       future_request_streams =
3023           g_list_append (future_request_streams, (gchar *) sid);
3024     }
3025   }
3026
3027   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3028     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3029     /* For slots that have an output, check if it's part of the streams to
3030      * be active */
3031     if (slot->output) {
3032       gboolean slot_to_deactivate = TRUE;
3033
3034       if (slot->active_stream) {
3035         if (stream_in_list (select_streams,
3036                 gst_stream_get_stream_id (slot->active_stream)))
3037           slot_to_deactivate = FALSE;
3038       }
3039       if (slot_to_deactivate && slot->pending_stream
3040           && slot->pending_stream != slot->active_stream) {
3041         if (stream_in_list (select_streams,
3042                 gst_stream_get_stream_id (slot->pending_stream)))
3043           slot_to_deactivate = FALSE;
3044       }
3045       if (slot_to_deactivate) {
3046         GST_DEBUG_OBJECT (dbin,
3047             "Slot %p (%s) should be deactivated, no longer used", slot,
3048             slot->
3049             active_stream ? gst_stream_get_stream_id (slot->active_stream) :
3050             "NULL");
3051         to_deactivate = g_list_append (to_deactivate, slot);
3052       }
3053     }
3054   }
3055
3056   if (to_deactivate != NULL) {
3057     GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
3058     /* We need to compare what needs to be activated and deactivated in order
3059      * to determine whether there are outputs that can be transferred */
3060     /* Take the stream-id of the slots that are to be activated, for which there
3061      * is a slot of the same type that needs to be deactivated */
3062     tmp = to_deactivate;
3063     while (tmp) {
3064       MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
3065       gboolean removeit = FALSE;
3066       GList *tmp2, *next;
3067       GST_DEBUG_OBJECT (dbin,
3068           "Checking if slot to deactivate (%p) has a candidate slot to activate",
3069           slot_to_deactivate);
3070       for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
3071         MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
3072         GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
3073         if (slot_to_activate->type == slot_to_deactivate->type) {
3074           GST_DEBUG_OBJECT (dbin, "Re-using");
3075           to_reassign = g_list_append (to_reassign, (gchar *)
3076               gst_stream_get_stream_id (slot_to_activate->active_stream));
3077           slots_to_reassign =
3078               g_list_append (slots_to_reassign, slot_to_deactivate);
3079           to_activate = g_list_remove (to_activate, slot_to_activate);
3080           removeit = TRUE;
3081           break;
3082         }
3083       }
3084       next = tmp->next;
3085       if (removeit)
3086         to_deactivate = g_list_delete_link (to_deactivate, tmp);
3087       tmp = next;
3088     }
3089   }
3090
3091   for (tmp = to_deactivate; tmp; tmp = tmp->next) {
3092     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3093     GST_DEBUG_OBJECT (dbin,
3094         "Really need to deactivate slot %p, but no available alternative",
3095         slot);
3096
3097     slots_to_reassign = g_list_append (slots_to_reassign, slot);
3098   }
3099
3100   /* The only slots left to activate are the ones that won't be reassigned and
3101    * therefore really need to have a new output created */
3102   for (tmp = to_activate; tmp; tmp = tmp->next) {
3103     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3104     if (slot->active_stream)
3105       future_request_streams =
3106           g_list_append (future_request_streams,
3107           (gchar *) gst_stream_get_stream_id (slot->active_stream));
3108     else if (slot->pending_stream)
3109       future_request_streams =
3110           g_list_append (future_request_streams,
3111           (gchar *) gst_stream_get_stream_id (slot->pending_stream));
3112     else
3113       GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
3114   }
3115
3116   if (to_activate == NULL && pending_streams != NULL) {
3117     GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
3118     if (dbin->requested_selection)
3119       g_list_free_full (dbin->requested_selection, g_free);
3120     dbin->requested_selection =
3121         g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
3122     g_list_free (to_deactivate);
3123     g_list_free (pending_streams);
3124     to_deactivate = NULL;
3125     pending_streams = NULL;
3126   } else {
3127     if (dbin->requested_selection)
3128       g_list_free_full (dbin->requested_selection, g_free);
3129     dbin->requested_selection =
3130         g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
3131     dbin->requested_selection =
3132         g_list_concat (dbin->requested_selection,
3133         g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
3134     if (dbin->to_activate)
3135       g_list_free (dbin->to_activate);
3136     dbin->to_activate = g_list_copy (to_reassign);
3137   }
3138
3139   dbin->selection_updated = TRUE;
3140   SELECTION_UNLOCK (dbin);
3141
3142   if (unknown) {
3143     GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
3144     g_list_free (unknown);
3145   }
3146
3147   if (to_activate && !slots_to_reassign) {
3148     for (tmp = to_activate; tmp; tmp = tmp->next) {
3149       MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3150       gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3151           (GstPadProbeCallback) idle_reconfigure, slot, NULL);
3152     }
3153   }
3154
3155   /* For all streams to deactivate, add an idle probe where we will do
3156    * the unassignment and switch over */
3157   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
3158     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3159     gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3160         (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
3161   }
3162
3163   if (to_deactivate)
3164     g_list_free (to_deactivate);
3165   if (to_activate)
3166     g_list_free (to_activate);
3167   if (to_reassign)
3168     g_list_free (to_reassign);
3169   if (future_request_streams)
3170     g_list_free (future_request_streams);
3171   if (pending_streams)
3172     g_list_free (pending_streams);
3173   if (slots_to_reassign)
3174     g_list_free (slots_to_reassign);
3175
3176   return ret;
3177 }
3178
3179 static GstPadProbeReturn
3180 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
3181     DecodebinOutputStream * output)
3182 {
3183   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
3184   GstDecodebin3 *dbin = output->dbin;
3185   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
3186
3187   GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
3188
3189   switch (GST_EVENT_TYPE (event)) {
3190     case GST_EVENT_SELECT_STREAMS:
3191     {
3192       GstPad *peer;
3193       GList *streams = NULL;
3194       guint32 seqnum = gst_event_get_seqnum (event);
3195
3196       if (dbin->upstream_selected) {
3197         GST_DEBUG_OBJECT (pad, "Letting select-streams event flow upstream");
3198         break;
3199       }
3200
3201       SELECTION_LOCK (dbin);
3202       if (seqnum == dbin->select_streams_seqnum) {
3203         SELECTION_UNLOCK (dbin);
3204         GST_DEBUG_OBJECT (pad,
3205             "Already handled/handling that SELECT_STREAMS event");
3206         gst_event_unref (event);
3207         ret = GST_PAD_PROBE_HANDLED;
3208         break;
3209       }
3210       dbin->select_streams_seqnum = seqnum;
3211       if (dbin->pending_select_streams != NULL) {
3212         GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3213         g_list_free (dbin->pending_select_streams);
3214         dbin->pending_select_streams = NULL;
3215       }
3216       gst_event_parse_select_streams (event, &streams);
3217       dbin->pending_select_streams = g_list_copy (streams);
3218       SELECTION_UNLOCK (dbin);
3219
3220       /* Send event upstream */
3221       if ((peer = gst_pad_get_peer (pad))) {
3222         gst_pad_send_event (peer, event);
3223         gst_object_unref (peer);
3224       } else {
3225         gst_event_unref (event);
3226       }
3227       /* Finally handle the switch */
3228       if (streams) {
3229         handle_stream_switch (dbin, streams, seqnum);
3230         g_list_free_full (streams, g_free);
3231       }
3232       ret = GST_PAD_PROBE_HANDLED;
3233     }
3234       break;
3235     default:
3236       break;
3237   }
3238
3239   return ret;
3240 }
3241
3242 static gboolean
3243 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
3244 {
3245   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3246
3247   GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
3248   if (!dbin->upstream_selected
3249       && GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
3250     GList *streams = NULL;
3251     guint32 seqnum = gst_event_get_seqnum (event);
3252
3253     SELECTION_LOCK (dbin);
3254     if (seqnum == dbin->select_streams_seqnum) {
3255       SELECTION_UNLOCK (dbin);
3256       GST_DEBUG_OBJECT (dbin,
3257           "Already handled/handling that SELECT_STREAMS event");
3258       return TRUE;
3259     }
3260     dbin->select_streams_seqnum = seqnum;
3261     if (dbin->pending_select_streams != NULL) {
3262       GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3263       g_list_free (dbin->pending_select_streams);
3264       dbin->pending_select_streams = NULL;
3265     }
3266     gst_event_parse_select_streams (event, &streams);
3267     dbin->pending_select_streams = g_list_copy (streams);
3268     SELECTION_UNLOCK (dbin);
3269
3270     /* FIXME : We don't have an upstream ?? */
3271 #if 0
3272     /* Send event upstream */
3273     if ((peer = gst_pad_get_peer (pad))) {
3274       gst_pad_send_event (peer, event);
3275       gst_object_unref (peer);
3276     }
3277 #endif
3278     /* Finally handle the switch */
3279     if (streams) {
3280       handle_stream_switch (dbin, streams, seqnum);
3281       g_list_free_full (streams, g_free);
3282     }
3283
3284     gst_event_unref (event);
3285     return TRUE;
3286   }
3287   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
3288 }
3289
3290
3291 static void
3292 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3293 {
3294   if (slot->probe_id)
3295     gst_pad_remove_probe (slot->src_pad, slot->probe_id);
3296   if (slot->input) {
3297     if (slot->input->srcpad)
3298       gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
3299   }
3300
3301   gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
3302   gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
3303   gst_object_replace ((GstObject **) & slot->src_pad, NULL);
3304   gst_object_replace ((GstObject **) & slot->active_stream, NULL);
3305   g_free (slot);
3306 }
3307
3308 static void
3309 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3310 {
3311   GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
3312   gst_element_call_async (GST_ELEMENT_CAST (dbin),
3313       (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
3314 }
3315
3316 /* Create a DecodebinOutputStream for a given type
3317  * Note: It will be empty initially, it needs to be configured
3318  * afterwards */
3319 static DecodebinOutputStream *
3320 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
3321 {
3322   DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
3323   gchar *pad_name;
3324   const gchar *prefix;
3325   GstStaticPadTemplate *templ;
3326   GstPadTemplate *ptmpl;
3327   guint32 *counter;
3328   GstPad *internal_pad;
3329
3330   GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
3331       res, gst_stream_type_get_name (type));
3332
3333   res->type = type;
3334   res->dbin = dbin;
3335   res->decoder_latency = GST_CLOCK_TIME_NONE;
3336
3337   if (type & GST_STREAM_TYPE_VIDEO) {
3338     templ = &video_src_template;
3339     counter = &dbin->vpadcount;
3340     prefix = "video";
3341   } else if (type & GST_STREAM_TYPE_AUDIO) {
3342     templ = &audio_src_template;
3343     counter = &dbin->apadcount;
3344     prefix = "audio";
3345   } else if (type & GST_STREAM_TYPE_TEXT) {
3346     templ = &text_src_template;
3347     counter = &dbin->tpadcount;
3348     prefix = "text";
3349   } else {
3350     templ = &src_template;
3351     counter = &dbin->opadcount;
3352     prefix = "src";
3353   }
3354
3355   pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
3356   *counter += 1;
3357   ptmpl = gst_static_pad_template_get (templ);
3358   res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
3359   gst_object_unref (ptmpl);
3360   g_free (pad_name);
3361   gst_pad_set_active (res->src_pad, TRUE);
3362   /* Put an event probe on the internal proxy pad to detect upstream
3363    * events */
3364   internal_pad =
3365       (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
3366   gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
3367       (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
3368   gst_object_unref (internal_pad);
3369
3370   dbin->output_streams = g_list_append (dbin->output_streams, res);
3371
3372   return res;
3373 }
3374
3375 static void
3376 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
3377 {
3378   if (output->slot) {
3379     if (output->decoder_sink && output->decoder)
3380       gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
3381
3382     output->slot->output = NULL;
3383     output->slot = NULL;
3384   }
3385   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
3386   gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
3387   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
3388   if (output->src_exposed) {
3389     gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
3390   }
3391   if (output->decoder) {
3392     gst_element_set_locked_state (output->decoder, TRUE);
3393     gst_element_set_state (output->decoder, GST_STATE_NULL);
3394     gst_bin_remove ((GstBin *) dbin, output->decoder);
3395   }
3396   g_free (output);
3397 }
3398
3399 static GstStateChangeReturn
3400 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
3401 {
3402   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3403   GstStateChangeReturn ret;
3404
3405   /* Upwards */
3406   switch (transition) {
3407     default:
3408       break;
3409   }
3410   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3411   if (ret == GST_STATE_CHANGE_FAILURE)
3412     goto beach;
3413
3414   switch (transition) {
3415     case GST_STATE_CHANGE_PAUSED_TO_READY:
3416     {
3417       GList *tmp;
3418
3419       /* Free output streams */
3420       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
3421         DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
3422         free_output_stream (dbin, output);
3423       }
3424       g_list_free (dbin->output_streams);
3425       dbin->output_streams = NULL;
3426       /* Free multiqueue slots */
3427       for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3428         MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3429         free_multiqueue_slot (dbin, slot);
3430       }
3431       g_list_free (dbin->slots);
3432       dbin->slots = NULL;
3433       dbin->current_group_id = GST_GROUP_ID_INVALID;
3434       /* Free inputs */
3435       /* Reset the main input group id since it will get a new id on a new stream */
3436       dbin->main_input->group_id = GST_GROUP_ID_INVALID;
3437       /* Reset multiqueue to default interleave */
3438       g_object_set (dbin->multiqueue, "min-interleave-time",
3439           dbin->default_mq_min_interleave, NULL);
3440       dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
3441       dbin->upstream_selected = FALSE;
3442       g_list_free_full (dbin->active_selection, g_free);
3443       dbin->active_selection = NULL;
3444     }
3445       break;
3446     default:
3447       break;
3448   }
3449 beach:
3450   return ret;
3451 }