65814f2e0c425936516ed6e610beeab58f1f4262
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst / playback / gstdecodebin3.c
1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32
33 #include "gstplaybackelements.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36
37 /**
38  * SECTION:element-decodebin3
39  * @title: decodebin3
40  *
41  * #GstBin that auto-magically constructs a decoding pipeline using available
42  * decoders and demuxers via auto-plugging. The output is raw audio, video
43  * or subtitle streams.
44  *
45  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46  *
47  * * supports publication and selection of stream information via
48  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAMS events.
49  *
50  * * dynamically switches stream connections internally, and
51  * reuses decoder elements when stream selections change, so that in
52  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53  * and only creates new elements when streams change and an existing decoder
54  * is not capable of handling the new format.
55  *
56  * * supports multiple input pads for the parallel decoding of auxiliary streams
57  * not muxed with the primary stream.
58  *
59  * * does not handle network stream buffering. decodebin3 expects that network stream
60  * buffering is handled upstream, before data is passed to it.
61  *
62  * > decodebin3 is still experimental API and a technology preview.
63  * > Its behaviour and exposed API is subject to change.
64  *
65  */
66
67 /*
68  * Global design
69  *
70  * 1) From sink pad to elementary streams (GstParseBin or identity)
71  *
72  * Note : If the incoming streams are push-based-only and are compatible with
73  * either the output caps or a potential decoder, the usage of parsebin is
74  * replaced by a simple passthrough identity element.
75  *
76  * The input sink pads are fed to GstParseBin. GstParseBin will feed them
77  * through typefind. When the caps are detected (or changed) we recursively
78  * figure out which demuxer, parser or depayloader is needed until we get to
79  * elementary streams.
80  *
81  * All elementary streams (whether decoded or not, whether exposed or not) are
82  * fed through multiqueue. There is only *one* multiqueue in decodebin3.
83  *
84  * => MultiQueue is the cornerstone.
85  * => No buffering before multiqueue
86  *
87  * 2) Elementary streams
88  *
89  * After GstParseBin, there are 3 main components:
90  *  1) Input Streams (provided by GstParseBin)
91  *  2) Multiqueue slots
92  *  3) Output Streams
93  *
94  * Input Streams correspond to the stream coming from GstParseBin and that gets
95  * fed into a multiqueue slot.
96  *
97  * Output Streams correspond to the combination of a (optional) decoder and an
98  * output ghostpad. Output Streams can be moved from one multiqueue slot to
99  * another, can reconfigure itself (different decoders), and can be
100  * added/removed depending on the configuration (all streams outputted, only one
101  * of each type, ...).
102  *
103  * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
104  * each 'active' Input Stream there is a corresponding slot.
105  * Slots might have different streams on input and output (due to internal
106  * buffering).
107  *
108  * Due to internal queuing/buffering/..., all those components (might) behave
109  * asynchronously. Therefore probes will be used on each component source pad to
110  * detect various key-points:
111  *  * EOS :
112  *     the stream is done => Mark that component as done, optionally freeing/removing it
113  *  * STREAM_START :
114  *     a new stream is starting => link it further if needed
115  *
116  * 3) Gradual replacement
117  *
118  * If the caps change at any point in decodebin (input sink pad, demuxer output,
119  * multiqueue output, ..), we gradually replace (if needed) the following elements.
120  *
121  * This is handled by the probes in various locations:
122  *  a) typefind output
123  *  b) multiqueue input (source pad of Input Streams)
124  *  c) multiqueue output (source pad of Multiqueue Slots)
125  *  d) final output (target of source ghostpads)
126  *
127  * When CAPS event arrive at those points, one of three things can happen:
128  * a) There is no elements downstream yet, just create/link-to following elements
129  * b) There are downstream elements, do a ACCEPT_CAPS query
130  *  b.1) The new CAPS are accepted, keep current configuration
131  *  b.2) The new CAPS are not accepted, remove following elements then do a)
132  *
133  *    Components:
134  *
135  *                                                   MultiQ     Output
136  *                     Input(s)                      Slots      Streams
137  *  /-------------------------------------------\   /-----\  /------------- \
138  *
139  * +-------------------------------------------------------------------------+
140  * |                                                                         |
141  * | +---------------------------------------------+                         |
142  * | |   GstParseBin(s)                            |                         |
143  * | |                +--------------+             |  +-----+                |
144  * | |                |              |---[parser]-[|--| Mul |---[ decoder ]-[|
145  * |]--[ typefind ]---|  demuxer(s)  |------------[|  | ti  |                |
146  * | |                |  (if needed) |---[parser]-[|--| qu  |                |
147  * | |                |              |---[parser]-[|--| eu  |---[ decoder ]-[|
148  * | |                +--------------+             |  +------             ^  |
149  * | +---------------------------------------------+        ^             |  |
150  * |                                               ^        |             |  |
151  * +-----------------------------------------------+--------+-------------+--+
152  *                                                 |        |             |
153  *                                                 |        |             |
154  *                                       Probes  --/--------/-------------/
155  *
156  * ATOMIC SWITCHING
157  *
158  * We want to ensure we re-use decoders when switching streams. This takes place
159  * at the multiqueue output level.
160  *
161  * MAIN CONCEPTS
162  *  1) Activating a stream (i.e. linking a slot to an output) is only done within
163  *    the streaming thread in the multiqueue_src_probe() and only if the
164  *    stream is in the REQUESTED selection.
165  *  2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
166  *    within the stream thread, but only in a purposefully called IDLE probe
167  *    that calls reassign_slot().
168  *
169  * Based on those two principles, 3 "selection" of streams (stream-id) are used:
170  * 1) requested_selection
171  *    All streams within that list should be activated
172  * 2) active_selection
173  *    List of streams that are exposed by decodebin
174  * 3) to_activate
175  *    List of streams that will be moved to requested_selection in the
176  *    reassign_slot() method (i.e. once a stream was deactivated, and the output
177  *    was retargetted)
178  */
179
180
181 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
182 #define GST_CAT_DEFAULT decodebin3_debug
183
184 #define GST_TYPE_DECODEBIN3      (gst_decodebin3_get_type ())
185
186 #define EXTRA_DEBUG 1
187
188 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
189 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
190 static GQuark
191 _custom_final_eos_quark_get (void)
192 {
193   static gsize g_quark;
194
195   if (g_once_init_enter (&g_quark)) {
196     gsize quark =
197         (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
198     g_once_init_leave (&g_quark, quark);
199   }
200   return g_quark;
201 }
202
203 typedef struct _GstDecodebin3 GstDecodebin3;
204 typedef struct _GstDecodebin3Class GstDecodebin3Class;
205
206 typedef struct _DecodebinInputStream DecodebinInputStream;
207 typedef struct _DecodebinInput DecodebinInput;
208 typedef struct _DecodebinOutputStream DecodebinOutputStream;
209
210 struct _GstDecodebin3
211 {
212   GstBin bin;
213
214   /* input_lock protects the following variables */
215   GMutex input_lock;
216   /* Main input (static sink pad) */
217   DecodebinInput *main_input;
218   /* Supplementary input (request sink pads) */
219   GList *other_inputs;
220   /* counter for input */
221   guint32 input_counter;
222   /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
223   guint32 current_group_id;
224   /* End of variables protected by input_lock */
225
226   GstElement *multiqueue;
227   GstClockTime default_mq_min_interleave;
228   GstClockTime current_mq_min_interleave;
229
230   /* selection_lock protects access to following variables */
231   GMutex selection_lock;
232   GList *input_streams;         /* List of DecodebinInputStream for active collection */
233   GList *output_streams;        /* List of DecodebinOutputStream used for output */
234   GList *slots;                 /* List of MultiQueueSlot */
235   guint slot_id;
236
237   /* Active collection */
238   GstStreamCollection *collection;
239   /* requested selection of stream-id to activate post-multiqueue */
240   GList *requested_selection;
241   /* list of stream-id currently activated in output */
242   GList *active_selection;
243   /* List of stream-id that need to be activated (after a stream switch for ex) */
244   GList *to_activate;
245   /* Pending select streams event */
246   guint32 select_streams_seqnum;
247   /* pending list of streams to select (from downstream) */
248   GList *pending_select_streams;
249   /* TRUE if requested_selection was updated, will become FALSE once
250    * it has fully transitioned to active */
251   gboolean selection_updated;
252   /* End of variables protected by selection_lock */
253   gboolean upstream_selected;
254
255   /* Factories */
256   GMutex factories_lock;
257   guint32 factories_cookie;
258   /* All DECODABLE factories */
259   GList *factories;
260   /* Only DECODER factories */
261   GList *decoder_factories;
262   /* DECODABLE but not DECODER factories */
263   GList *decodable_factories;
264
265   /* counters for pads */
266   guint32 apadcount, vpadcount, tpadcount, opadcount;
267
268   /* Properties */
269   GstCaps *caps;
270 };
271
272 struct _GstDecodebin3Class
273 {
274   GstBinClass class;
275
276     gint (*select_stream) (GstDecodebin3 * dbin,
277       GstStreamCollection * collection, GstStream * stream);
278 };
279
280 /* Input of decodebin, controls input pad and parsebin */
281 struct _DecodebinInput
282 {
283   GstDecodebin3 *dbin;
284
285   gboolean is_main;
286
287   GstPad *ghost_sink;
288   GstPad *parsebin_sink;
289
290   GstStreamCollection *collection;      /* Active collection */
291   gboolean upstream_selected;
292
293   guint group_id;
294
295   /* Either parsebin or identity is used */
296   GstElement *parsebin;
297   GstElement *identity;
298
299   gulong pad_added_sigid;
300   gulong pad_removed_sigid;
301   gulong drained_sigid;
302
303   /* TRUE if the input got drained */
304   gboolean drained;
305 };
306
307 /* Multiqueue Slots */
308 typedef struct _MultiQueueSlot
309 {
310   guint id;
311
312   GstDecodebin3 *dbin;
313   /* Type of stream handled by this slot */
314   GstStreamType type;
315
316   /* Linked input and output */
317   DecodebinInputStream *input;
318
319   /* pending => last stream received on sink pad */
320   GstStream *pending_stream;
321   /* active => last stream outputted on source pad */
322   GstStream *active_stream;
323
324   GstPad *sink_pad, *src_pad;
325
326   /* id of the MQ src_pad event probe */
327   gulong probe_id;
328
329   /* TRUE if EOS was pushed out by multiqueue */
330   gboolean is_drained;
331
332   DecodebinOutputStream *output;
333 } MultiQueueSlot;
334
335 /* Streams that are exposed downstream (i.e. output) */
336 struct _DecodebinOutputStream
337 {
338   GstDecodebin3 *dbin;
339   /* The type of stream handled by this output stream */
340   GstStreamType type;
341
342   /* The slot to which this output stream is currently connected to */
343   MultiQueueSlot *slot;
344
345   GstElement *decoder;          /* Optional */
346   GstPad *decoder_sink, *decoder_src;
347   gboolean linked;
348
349   /* ghostpad */
350   GstPad *src_pad;
351   /* Flag if ghost pad is exposed */
352   gboolean src_exposed;
353
354   /* Reported decoder latency */
355   GstClockTime decoder_latency;
356
357   /* keyframe dropping probe */
358   gulong drop_probe_id;
359 };
360
361 /* properties */
362 enum
363 {
364   PROP_0,
365   PROP_CAPS
366 };
367
368 /* signals */
369 enum
370 {
371   SIGNAL_SELECT_STREAM,
372   SIGNAL_ABOUT_TO_FINISH,
373   LAST_SIGNAL
374 };
375 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
376
377 #define SELECTION_LOCK(dbin) G_STMT_START {                             \
378     GST_LOG_OBJECT (dbin,                                               \
379                     "selection locking from thread %p",                 \
380                     g_thread_self ());                                  \
381     g_mutex_lock (&dbin->selection_lock);                               \
382     GST_LOG_OBJECT (dbin,                                               \
383                     "selection locked from thread %p",                  \
384                     g_thread_self ());                                  \
385   } G_STMT_END
386
387 #define SELECTION_UNLOCK(dbin) G_STMT_START {                           \
388     GST_LOG_OBJECT (dbin,                                               \
389                     "selection unlocking from thread %p",               \
390                     g_thread_self ());                                  \
391     g_mutex_unlock (&dbin->selection_lock);                             \
392   } G_STMT_END
393
394 #define INPUT_LOCK(dbin) G_STMT_START {                         \
395     GST_LOG_OBJECT (dbin,                                               \
396                     "input locking from thread %p",                     \
397                     g_thread_self ());                                  \
398     g_mutex_lock (&dbin->input_lock);                           \
399     GST_LOG_OBJECT (dbin,                                               \
400                     "input locked from thread %p",                      \
401                     g_thread_self ());                                  \
402   } G_STMT_END
403
404 #define INPUT_UNLOCK(dbin) G_STMT_START {                               \
405     GST_LOG_OBJECT (dbin,                                               \
406                     "input unlocking from thread %p",           \
407                     g_thread_self ());                                  \
408     g_mutex_unlock (&dbin->input_lock);                         \
409   } G_STMT_END
410
411 GType gst_decodebin3_get_type (void);
412 #define gst_decodebin3_parent_class parent_class
413 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
414 #define _do_init \
415     GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");\
416     playback_element_init (plugin);
417 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (decodebin3, "decodebin3", GST_RANK_NONE,
418     GST_TYPE_DECODEBIN3, _do_init);
419
420 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
421
422 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
423     GST_PAD_SINK,
424     GST_PAD_ALWAYS,
425     GST_STATIC_CAPS_ANY);
426
427 static GstStaticPadTemplate request_sink_template =
428 GST_STATIC_PAD_TEMPLATE ("sink_%u",
429     GST_PAD_SINK,
430     GST_PAD_REQUEST,
431     GST_STATIC_CAPS_ANY);
432
433 static GstStaticPadTemplate video_src_template =
434 GST_STATIC_PAD_TEMPLATE ("video_%u",
435     GST_PAD_SRC,
436     GST_PAD_SOMETIMES,
437     GST_STATIC_CAPS_ANY);
438
439 static GstStaticPadTemplate audio_src_template =
440 GST_STATIC_PAD_TEMPLATE ("audio_%u",
441     GST_PAD_SRC,
442     GST_PAD_SOMETIMES,
443     GST_STATIC_CAPS_ANY);
444
445 static GstStaticPadTemplate text_src_template =
446 GST_STATIC_PAD_TEMPLATE ("text_%u",
447     GST_PAD_SRC,
448     GST_PAD_SOMETIMES,
449     GST_STATIC_CAPS_ANY);
450
451 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
452     GST_PAD_SRC,
453     GST_PAD_SOMETIMES,
454     GST_STATIC_CAPS_ANY);
455
456
457 static void gst_decodebin3_dispose (GObject * object);
458 static void gst_decodebin3_finalize (GObject * object);
459 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
460     const GValue * value, GParamSpec * pspec);
461 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
462     GValue * value, GParamSpec * pspec);
463
464 static gboolean parsebin_autoplug_continue_cb (GstElement *
465     parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
466
467 static gint
468 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
469     GstStreamCollection * collection, GstStream * stream)
470 {
471   GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
472
473   return -1;
474 }
475
476 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
477     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
478 static void gst_decodebin3_release_pad (GstElement * element, GstPad * pad);
479 static void handle_stream_collection (GstDecodebin3 * dbin,
480     GstStreamCollection * collection, DecodebinInput * input);
481 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
482 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
483     GstStateChange transition);
484 static gboolean gst_decodebin3_send_event (GstElement * element,
485     GstEvent * event);
486
487 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
488 #if 0
489 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
490     GstElementFactoryListType ftype);
491 #endif
492
493 static void reset_input (GstDecodebin3 * dbin, DecodebinInput * input);
494 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
495 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
496 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
497
498 static void reconfigure_output_stream (DecodebinOutputStream * output,
499     MultiQueueSlot * slot);
500 static void free_output_stream (GstDecodebin3 * dbin,
501     DecodebinOutputStream * output);
502 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
503     GstStreamType type);
504
505 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
506     GstPadProbeInfo * info, MultiQueueSlot * slot);
507 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
508 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
509     DecodebinInputStream * input);
510 static void link_input_to_slot (DecodebinInputStream * input,
511     MultiQueueSlot * slot);
512 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
513 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
514     MultiQueueSlot * slot);
515
516 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
517 static void update_requested_selection (GstDecodebin3 * dbin);
518
519 /* FIXME: Really make all the parser stuff a self-contained helper object */
520 #include "gstdecodebin3-parse.c"
521
522 static gboolean
523 _gst_int_accumulator (GSignalInvocationHint * ihint,
524     GValue * return_accu, const GValue * handler_return, gpointer dummy)
525 {
526   gint res = g_value_get_int (handler_return);
527
528   g_value_set_int (return_accu, res);
529
530   if (res == -1)
531     return TRUE;
532
533   return FALSE;
534 }
535
536 static void
537 gst_decodebin3_class_init (GstDecodebin3Class * klass)
538 {
539   GObjectClass *gobject_klass = (GObjectClass *) klass;
540   GstElementClass *element_class = (GstElementClass *) klass;
541   GstBinClass *bin_klass = (GstBinClass *) klass;
542
543   gobject_klass->dispose = gst_decodebin3_dispose;
544   gobject_klass->finalize = gst_decodebin3_finalize;
545   gobject_klass->set_property = gst_decodebin3_set_property;
546   gobject_klass->get_property = gst_decodebin3_get_property;
547
548   g_object_class_install_property (gobject_klass, PROP_CAPS,
549       g_param_spec_boxed ("caps", "Caps",
550           "The caps on which to stop decoding. (NULL = default)",
551           GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
552
553   /**
554    * GstDecodebin3::select-stream
555    * @decodebin: a #GstDecodebin3
556    * @collection: a #GstStreamCollection
557    * @stream: a #GstStream
558    *
559    * This signal is emitted whenever @decodebin needs to decide whether
560    * to expose a @stream of a given @collection.
561    *
562    * Note that the prefered way to select streams is to listen to
563    * GST_MESSAGE_STREAM_COLLECTION on the bus and send a
564    * GST_EVENT_SELECT_STREAMS with the streams the user wants.
565    *
566    * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
567    * A value of -1 (default) lets @decodebin decide what to do with the stream.
568    * */
569   gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
570       g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
571       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
572       _gst_int_accumulator, NULL, NULL,
573       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
574
575   /**
576    * GstDecodebin3::about-to-finish:
577    *
578    * This signal is emitted when the data for the selected URI is
579    * entirely buffered and it is safe to specify another URI.
580    */
581   gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
582       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
583       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
584
585
586   element_class->request_new_pad =
587       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
588   element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
589   element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
590   element_class->release_pad = GST_DEBUG_FUNCPTR (gst_decodebin3_release_pad);
591
592   gst_element_class_add_pad_template (element_class,
593       gst_static_pad_template_get (&sink_template));
594   gst_element_class_add_pad_template (element_class,
595       gst_static_pad_template_get (&request_sink_template));
596   gst_element_class_add_pad_template (element_class,
597       gst_static_pad_template_get (&video_src_template));
598   gst_element_class_add_pad_template (element_class,
599       gst_static_pad_template_get (&audio_src_template));
600   gst_element_class_add_pad_template (element_class,
601       gst_static_pad_template_get (&text_src_template));
602   gst_element_class_add_pad_template (element_class,
603       gst_static_pad_template_get (&src_template));
604
605   gst_element_class_set_static_metadata (element_class,
606       "Decoder Bin 3", "Generic/Bin/Decoder",
607       "Autoplug and decode to raw media",
608       "Edward Hervey <edward@centricular.com>");
609
610   bin_klass->handle_message = gst_decodebin3_handle_message;
611
612   klass->select_stream = gst_decodebin3_select_stream;
613 }
614
615 static void
616 gst_decodebin3_init (GstDecodebin3 * dbin)
617 {
618   /* Create main input */
619   dbin->main_input = create_new_input (dbin, TRUE);
620
621   dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
622   g_object_get (dbin->multiqueue, "min-interleave-time",
623       &dbin->default_mq_min_interleave, NULL);
624   dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
625   g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
626       "max-size-buffers", 0, "use-interleave", TRUE, NULL);
627   gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
628
629   dbin->current_group_id = GST_GROUP_ID_INVALID;
630
631   g_mutex_init (&dbin->factories_lock);
632   g_mutex_init (&dbin->selection_lock);
633   g_mutex_init (&dbin->input_lock);
634
635   dbin->caps = gst_static_caps_get (&default_raw_caps);
636
637   GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
638 }
639
640 static void
641 gst_decodebin3_reset (GstDecodebin3 * dbin)
642 {
643   GList *tmp;
644
645   GST_DEBUG_OBJECT (dbin, "Resetting");
646
647   /* Free output streams */
648   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
649     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
650     free_output_stream (dbin, output);
651   }
652   g_list_free (dbin->output_streams);
653   dbin->output_streams = NULL;
654
655   /* Free multiqueue slots */
656   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
657     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
658     free_multiqueue_slot (dbin, slot);
659   }
660   g_list_free (dbin->slots);
661   dbin->slots = NULL;
662   dbin->current_group_id = GST_GROUP_ID_INVALID;
663
664   /* Reset the inputs */
665   reset_input (dbin, dbin->main_input);
666   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
667     reset_input (dbin, tmp->data);
668   }
669
670   /* Reset multiqueue to default interleave */
671   g_object_set (dbin->multiqueue, "min-interleave-time",
672       dbin->default_mq_min_interleave, NULL);
673   dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
674   dbin->upstream_selected = FALSE;
675
676   g_list_free_full (dbin->requested_selection, g_free);
677   dbin->requested_selection = NULL;
678
679   g_list_free_full (dbin->active_selection, g_free);
680   dbin->active_selection = NULL;
681
682   g_list_free (dbin->to_activate);
683   dbin->to_activate = NULL;
684
685   g_list_free (dbin->pending_select_streams);
686   dbin->pending_select_streams = NULL;
687
688   dbin->selection_updated = FALSE;
689 }
690
691 static void
692 gst_decodebin3_dispose (GObject * object)
693 {
694   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
695   GList *walk, *next;
696
697   gst_decodebin3_reset (dbin);
698
699   if (dbin->factories) {
700     gst_plugin_feature_list_free (dbin->factories);
701     dbin->factories = NULL;
702   }
703   if (dbin->decoder_factories) {
704     g_list_free (dbin->decoder_factories);
705     dbin->decoder_factories = NULL;
706   }
707   if (dbin->decodable_factories) {
708     g_list_free (dbin->decodable_factories);
709     dbin->decodable_factories = NULL;
710   }
711
712   gst_clear_object (&dbin->collection);
713
714   if (dbin->main_input) {
715     free_input (dbin, dbin->main_input);
716     dbin->main_input = NULL;
717   }
718
719   for (walk = dbin->other_inputs; walk; walk = next) {
720     DecodebinInput *input = walk->data;
721
722     next = g_list_next (walk);
723
724     free_input (dbin, input);
725     dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
726   }
727
728   G_OBJECT_CLASS (parent_class)->dispose (object);
729 }
730
731 static void
732 gst_decodebin3_finalize (GObject * object)
733 {
734   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
735
736   g_mutex_clear (&dbin->factories_lock);
737   g_mutex_clear (&dbin->selection_lock);
738   g_mutex_clear (&dbin->input_lock);
739
740   G_OBJECT_CLASS (parent_class)->finalize (object);
741 }
742
743 static void
744 gst_decodebin3_set_property (GObject * object, guint prop_id,
745     const GValue * value, GParamSpec * pspec)
746 {
747   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
748
749   switch (prop_id) {
750     case PROP_CAPS:
751       GST_OBJECT_LOCK (dbin);
752       if (dbin->caps)
753         gst_caps_unref (dbin->caps);
754       dbin->caps = g_value_dup_boxed (value);
755       GST_OBJECT_UNLOCK (dbin);
756       break;
757     default:
758       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
759       break;
760   }
761 }
762
763 static void
764 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
765     GParamSpec * pspec)
766 {
767   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
768
769   switch (prop_id) {
770     case PROP_CAPS:
771       GST_OBJECT_LOCK (dbin);
772       g_value_set_boxed (value, dbin->caps);
773       GST_OBJECT_UNLOCK (dbin);
774       break;
775     default:
776       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
777       break;
778   }
779 }
780
781 static gboolean
782 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
783     GstCaps * caps, GstDecodebin3 * dbin)
784 {
785   GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
786
787   /* If it matches our target caps, expose it */
788   if (gst_caps_can_intersect (caps, dbin->caps))
789     return FALSE;
790
791   return TRUE;
792 }
793
794 /* This method should be called whenever a STREAM_START event
795  * comes out of a given parsebin.
796  * The caller shall replace the group_id if the function returns TRUE */
797 static gboolean
798 set_input_group_id (DecodebinInput * input, guint32 * group_id)
799 {
800   GstDecodebin3 *dbin = input->dbin;
801
802   if (input->group_id != *group_id) {
803     if (input->group_id != GST_GROUP_ID_INVALID)
804       GST_WARNING_OBJECT (dbin,
805           "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
806           ") on input %p ", input->group_id, *group_id, input);
807     input->group_id = *group_id;
808   }
809
810   if (*group_id != dbin->current_group_id) {
811     /* The input is being re-used with a different incoming stream, we do want
812      * to change/unify to this new group-id */
813     if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
814       GST_DEBUG_OBJECT (dbin,
815           "Setting current group id to %" G_GUINT32_FORMAT, *group_id);
816       dbin->current_group_id = *group_id;
817     }
818     *group_id = dbin->current_group_id;
819     return TRUE;
820   }
821
822   return FALSE;
823 }
824
825 static void
826 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
827 {
828   GstDecodebin3 *dbin = input->dbin;
829   gboolean all_drained;
830   GList *tmp;
831
832   GST_INFO_OBJECT (dbin, "input %p drained", input);
833   input->drained = TRUE;
834
835   all_drained = dbin->main_input->drained;
836   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
837     DecodebinInput *data = (DecodebinInput *) tmp->data;
838
839     all_drained &= data->drained;
840   }
841
842   if (all_drained) {
843     GST_INFO_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
844     g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
845         NULL);
846   }
847 }
848
849 /* Call with INPUT_LOCK taken */
850 static gboolean
851 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
852 {
853   gboolean set_state = FALSE;
854
855   if (input->parsebin == NULL) {
856     input->parsebin = gst_element_factory_make ("parsebin", NULL);
857     if (input->parsebin == NULL)
858       goto no_parsebin;
859     input->parsebin = gst_object_ref (input->parsebin);
860     input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
861     input->pad_added_sigid =
862         g_signal_connect (input->parsebin, "pad-added",
863         (GCallback) parsebin_pad_added_cb, input);
864     input->pad_removed_sigid =
865         g_signal_connect (input->parsebin, "pad-removed",
866         (GCallback) parsebin_pad_removed_cb, input);
867     input->drained_sigid =
868         g_signal_connect (input->parsebin, "drained",
869         (GCallback) parsebin_drained_cb, input);
870     g_signal_connect (input->parsebin, "autoplug-continue",
871         (GCallback) parsebin_autoplug_continue_cb, dbin);
872   }
873
874   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
875     /* The state lock is taken so that we ensure we are the one (de)activating
876      * parsebin. We need to do this to ensure any activation taking place in
877      * parsebin (including by elements doing upstream activation) are done
878      * within the same thread. */
879     GST_STATE_LOCK (input->parsebin);
880     gst_bin_add (GST_BIN (dbin), input->parsebin);
881     set_state = TRUE;
882   }
883
884   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
885       input->parsebin_sink);
886
887   if (set_state) {
888     gst_element_sync_state_with_parent (input->parsebin);
889     GST_STATE_UNLOCK (input->parsebin);
890   }
891
892   return TRUE;
893
894   /* ERRORS */
895 no_parsebin:
896   {
897     gst_element_post_message ((GstElement *) dbin,
898         gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
899     return FALSE;
900   }
901 }
902
903 static GstPadLinkReturn
904 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
905 {
906   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
907   GstQuery *query;
908   gboolean pull_mode = FALSE;
909   GstPadLinkReturn res = GST_PAD_LINK_OK;
910   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
911
912   g_return_val_if_fail (input, GST_PAD_LINK_REFUSED);
913
914   GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT, pad);
915
916   query = gst_query_new_scheduling ();
917   if (gst_pad_query (peer, query)
918       && gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL,
919           GST_SCHEDULING_FLAG_SEEKABLE))
920     pull_mode = TRUE;
921   gst_query_unref (query);
922
923   GST_DEBUG_OBJECT (dbin, "Upstream can do pull-based : %d", pull_mode);
924
925   /* If upstream *can* do pull-based, we always use a parsebin. If not, we will
926    * delay that decision to a later stage (caps/stream/collection event
927    * processing) to figure out if one is really needed or whether an identity
928    * element will be enough */
929   INPUT_LOCK (dbin);
930   if (pull_mode) {
931     if (!ensure_input_parsebin (dbin, input))
932       res = GST_PAD_LINK_REFUSED;
933     else if (input->identity) {
934       GST_ERROR_OBJECT (dbin,
935           "Can't reconfigure input from push-based to pull-based");
936       res = GST_PAD_LINK_REFUSED;
937     }
938   }
939
940   /* Clear stream-collection corresponding to current INPUT.  We do not
941    * recalculate the global one yet, it will be done when at least one
942    * collection is received/computed for this input.
943    */
944   if (input->collection) {
945     GST_DEBUG_OBJECT (pad, "Clearing input collection");
946     gst_object_unref (input->collection);
947     input->collection = NULL;
948   }
949
950   INPUT_UNLOCK (dbin);
951
952   return res;
953 }
954
955 /* Drop duration query during _input_pad_unlink */
956 static GstPadProbeReturn
957 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
958     DecodebinInput * input)
959 {
960   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
961
962   if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
963     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
964     if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
965       GST_LOG_OBJECT (pad, "stop forwarding query duration");
966       ret = GST_PAD_PROBE_HANDLED;
967     }
968   }
969
970   return ret;
971 }
972
973 static void
974 recalculate_group_id (GstDecodebin3 * dbin)
975 {
976   guint32 common_group_id;
977   GList *iter;
978
979   common_group_id = dbin->main_input->group_id;
980
981   for (iter = dbin->other_inputs; iter; iter = iter->next) {
982     DecodebinInput *input = iter->data;
983
984     if (input->group_id != common_group_id)
985       return;
986   }
987
988   GST_DEBUG_OBJECT (dbin, "Updating global group_id to %" G_GUINT32_FORMAT,
989       common_group_id);
990   dbin->current_group_id = common_group_id;
991 }
992
993 /* CALL with INPUT LOCK */
994 static void
995 reset_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
996 {
997   GList *iter;
998
999   if (input->parsebin == NULL)
1000     return;
1001
1002   GST_DEBUG_OBJECT (dbin, "Resetting %" GST_PTR_FORMAT, input->parsebin);
1003
1004   GST_STATE_LOCK (dbin);
1005   gst_element_set_state (input->parsebin, GST_STATE_NULL);
1006   input->drained = FALSE;
1007   input->group_id = GST_GROUP_ID_INVALID;
1008   recalculate_group_id (dbin);
1009   for (iter = dbin->input_streams; iter; iter = iter->next) {
1010     DecodebinInputStream *istream = iter->data;
1011     if (istream->input == input)
1012       istream->saw_eos = TRUE;
1013   }
1014   gst_element_sync_state_with_parent (input->parsebin);
1015   GST_STATE_UNLOCK (dbin);
1016 }
1017
1018
1019 static void
1020 gst_decodebin3_input_pad_unlink (GstPad * pad, GstPad * peer,
1021     DecodebinInput * input)
1022 {
1023   GstDecodebin3 *dbin = input->dbin;
1024
1025   g_return_if_fail (input);
1026
1027   GST_LOG_OBJECT (dbin, "Got unlink on input pad %" GST_PTR_FORMAT, pad);
1028
1029   INPUT_LOCK (dbin);
1030   if (input->parsebin == NULL) {
1031     INPUT_UNLOCK (dbin);
1032     return;
1033   }
1034
1035   if (GST_PAD_MODE (pad) == GST_PAD_MODE_PULL) {
1036     GST_DEBUG_OBJECT (dbin, "Resetting parsebin since it's pull-based");
1037     reset_input_parsebin (dbin, input);
1038   }
1039
1040   INPUT_UNLOCK (dbin);
1041 }
1042
1043 static void
1044 gst_decodebin3_release_pad (GstElement * element, GstPad * pad)
1045 {
1046   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1047   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
1048   GstStreamCollection *collection = NULL;
1049   gulong probe_id = 0;
1050   GstMessage *msg;
1051
1052   g_return_if_fail (input);
1053   GST_LOG_OBJECT (dbin, "Releasing pad %" GST_PTR_FORMAT, pad);
1054
1055   INPUT_LOCK (dbin);
1056
1057   /* Clear stream-collection corresponding to current INPUT and post new
1058    * stream-collection message, if needed */
1059   if (input->collection) {
1060     gst_object_unref (input->collection);
1061     input->collection = NULL;
1062   }
1063
1064   SELECTION_LOCK (dbin);
1065   collection = get_merged_collection (dbin);
1066   if (!collection) {
1067     SELECTION_UNLOCK (dbin);
1068     goto beach;
1069   }
1070   if (collection == dbin->collection) {
1071     SELECTION_UNLOCK (dbin);
1072     gst_object_unref (collection);
1073     goto beach;
1074   }
1075
1076   GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
1077
1078   if (dbin->collection)
1079     gst_object_unref (dbin->collection);
1080   dbin->collection = collection;
1081   dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1082
1083   msg =
1084       gst_message_new_stream_collection ((GstObject *) dbin, dbin->collection);
1085
1086   if (input->parsebin)
1087     /* Drop duration queries that the application might be doing while this message is posted */
1088     probe_id = gst_pad_add_probe (input->parsebin_sink,
1089         GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
1090         (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
1091
1092   SELECTION_UNLOCK (dbin);
1093   gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
1094   update_requested_selection (dbin);
1095
1096   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
1097   if (input->parsebin) {
1098     gst_bin_remove (GST_BIN (dbin), input->parsebin);
1099     gst_element_set_state (input->parsebin, GST_STATE_NULL);
1100     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
1101     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
1102     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
1103     gst_pad_remove_probe (input->parsebin_sink, probe_id);
1104     gst_object_unref (input->parsebin);
1105     gst_object_unref (input->parsebin_sink);
1106
1107     input->parsebin = NULL;
1108     input->parsebin_sink = NULL;
1109   }
1110   if (input->identity) {
1111     GstPad *idpad = gst_element_get_static_pad (input->identity, "src");
1112     DecodebinInputStream *stream = find_input_stream_for_pad (dbin, idpad);
1113     gst_object_unref (idpad);
1114     remove_input_stream (dbin, stream);
1115     gst_element_set_state (input->identity, GST_STATE_NULL);
1116     gst_bin_remove (GST_BIN (dbin), input->identity);
1117     gst_object_unref (input->identity);
1118     input->identity = NULL;
1119   }
1120
1121   if (!input->is_main) {
1122     dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
1123     free_input (dbin, input);
1124   }
1125
1126 beach:
1127   INPUT_UNLOCK (dbin);
1128   return;
1129 }
1130
1131 /* Call with INPUT LOCK */
1132 static void
1133 reset_input (GstDecodebin3 * dbin, DecodebinInput * input)
1134 {
1135   GST_LOG_OBJECT (dbin, "Resetting input %p", input);
1136
1137   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
1138
1139   if (input->parsebin) {
1140     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
1141     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
1142     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
1143     gst_element_set_state (input->parsebin, GST_STATE_NULL);
1144     gst_clear_object (&input->parsebin);
1145     gst_clear_object (&input->parsebin_sink);
1146   }
1147   if (input->identity) {
1148     GstPad *idpad = gst_element_get_static_pad (input->identity, "src");
1149     DecodebinInputStream *stream = find_input_stream_for_pad (dbin, idpad);
1150     gst_object_unref (idpad);
1151     remove_input_stream (dbin, stream);
1152     gst_element_set_state (input->identity, GST_STATE_NULL);
1153     gst_clear_object (&input->identity);
1154   }
1155   if (input->collection)
1156     gst_clear_object (&input->collection);
1157
1158   input->group_id = GST_GROUP_ID_INVALID;
1159 }
1160
1161 /* Call with INPUT LOCK */
1162 static void
1163 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
1164 {
1165   reset_input (dbin, input);
1166
1167   GST_LOG_OBJECT (dbin, "Freeing input %p", input);
1168
1169   gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
1170   g_free (input);
1171 }
1172
1173 static gboolean
1174 sink_query_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstQuery * query)
1175 {
1176   DecodebinInput *input =
1177       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1178
1179   g_return_val_if_fail (input, FALSE);
1180
1181   GST_DEBUG_OBJECT (sinkpad, "query %" GST_PTR_FORMAT, query);
1182
1183   /* We accept any caps, since we will reconfigure ourself internally if the new
1184    * stream is incompatible */
1185   if (GST_QUERY_TYPE (query) == GST_QUERY_ACCEPT_CAPS) {
1186     GST_DEBUG_OBJECT (dbin, "Accepting ACCEPT_CAPS query");
1187     gst_query_set_accept_caps_result (query, TRUE);
1188     return TRUE;
1189   }
1190   return gst_pad_query_default (sinkpad, GST_OBJECT (dbin), query);
1191 }
1192
1193 static gboolean
1194 is_parsebin_required_for_input (GstDecodebin3 * dbin, DecodebinInput * input,
1195     GstCaps * newcaps, GstPad * sinkpad)
1196 {
1197   gboolean parsebin_needed = TRUE;
1198   GstStream *stream;
1199
1200   stream = gst_pad_get_stream (sinkpad);
1201
1202   if (stream == NULL) {
1203     /* If upstream didn't provide a `GstStream` we will need to create a
1204      * parsebin to handle that stream */
1205     GST_DEBUG_OBJECT (sinkpad,
1206         "Need to create parsebin since upstream doesn't provide GstStream");
1207   } else if (gst_caps_can_intersect (newcaps, dbin->caps)) {
1208     /* If the incoming caps match decodebin3 output, no processing is needed */
1209     GST_FIXME_OBJECT (sinkpad, "parsebin not needed (matches output caps) !");
1210     parsebin_needed = FALSE;
1211   } else {
1212     GList *decoder_list;
1213     /* If the incoming caps are compatible with a decoder, we don't need to
1214      * process it before */
1215     g_mutex_lock (&dbin->factories_lock);
1216     gst_decode_bin_update_factories_list (dbin);
1217     decoder_list =
1218         gst_element_factory_list_filter (dbin->decoder_factories, newcaps,
1219         GST_PAD_SINK, TRUE);
1220     g_mutex_unlock (&dbin->factories_lock);
1221     if (decoder_list) {
1222       GST_FIXME_OBJECT (sinkpad, "parsebin not needed (available decoders) !");
1223       gst_plugin_feature_list_free (decoder_list);
1224       parsebin_needed = FALSE;
1225     }
1226   }
1227   if (stream)
1228     gst_object_unref (stream);
1229
1230   return parsebin_needed;
1231 }
1232
1233 static void
1234 setup_identify_for_input (GstDecodebin3 * dbin, DecodebinInput * input,
1235     GstPad * sinkpad)
1236 {
1237   GstPad *idsrc, *idsink;
1238   DecodebinInputStream *inputstream;
1239
1240   GST_DEBUG_OBJECT (sinkpad, "Adding identity for new input stream");
1241
1242   input->identity = gst_element_factory_make ("identity", NULL);
1243   /* We drop allocation queries due to our usage of multiqueue just
1244    * afterwards. It is just too dangerous.
1245    *
1246    * If application users want to have optimal raw source <=> sink allocations
1247    * they should not use decodebin3
1248    */
1249   g_object_set (input->identity, "drop-allocation", TRUE, NULL);
1250   input->identity = gst_object_ref (input->identity);
1251   idsink = gst_element_get_static_pad (input->identity, "sink");
1252   idsrc = gst_element_get_static_pad (input->identity, "src");
1253   gst_bin_add (GST_BIN (dbin), input->identity);
1254
1255   SELECTION_LOCK (dbin);
1256   inputstream = create_input_stream (dbin, idsrc, input);
1257   /* Forward any existing GstStream directly on the input stream */
1258   inputstream->active_stream = gst_pad_get_stream (sinkpad);
1259   SELECTION_UNLOCK (dbin);
1260
1261   gst_object_unref (idsrc);
1262   gst_object_unref (idsink);
1263   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), idsink);
1264   gst_element_sync_state_with_parent (input->identity);
1265 }
1266
1267 static gboolean
1268 sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
1269 {
1270   DecodebinInput *input =
1271       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1272
1273   g_return_val_if_fail (input, FALSE);
1274
1275   GST_DEBUG_OBJECT (sinkpad, "event %" GST_PTR_FORMAT, event);
1276
1277   switch (GST_EVENT_TYPE (event)) {
1278     case GST_EVENT_STREAM_START:
1279     {
1280       GstQuery *q = gst_query_new_selectable ();
1281
1282       /* Query whether upstream can handle stream selection or not */
1283       if (gst_pad_peer_query (sinkpad, q)) {
1284         gst_query_parse_selectable (q, &input->upstream_selected);
1285         GST_DEBUG_OBJECT (sinkpad, "Upstream is selectable : %d",
1286             input->upstream_selected);
1287       } else {
1288         input->upstream_selected = FALSE;
1289         GST_DEBUG_OBJECT (sinkpad, "Upstream does not handle SELECTABLE query");
1290       }
1291       gst_query_unref (q);
1292
1293       /* FIXME : We force `decodebin3` to upstream selection mode if *any* of the
1294          inputs is. This means things might break if there's a mix */
1295       if (input->upstream_selected)
1296         dbin->upstream_selected = TRUE;
1297
1298       /* Make sure group ids will be recalculated */
1299       input->group_id = GST_GROUP_ID_INVALID;
1300       recalculate_group_id (dbin);
1301       break;
1302     }
1303     case GST_EVENT_STREAM_COLLECTION:
1304     {
1305       GstStreamCollection *collection = NULL;
1306
1307       gst_event_parse_stream_collection (event, &collection);
1308       if (collection) {
1309         INPUT_LOCK (dbin);
1310         handle_stream_collection (dbin, collection, input);
1311         gst_object_unref (collection);
1312         INPUT_UNLOCK (dbin);
1313         SELECTION_LOCK (dbin);
1314         /* Post the (potentially) updated collection */
1315         if (dbin->collection) {
1316           GstMessage *msg;
1317           msg =
1318               gst_message_new_stream_collection ((GstObject *) dbin,
1319               dbin->collection);
1320           SELECTION_UNLOCK (dbin);
1321           gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
1322           update_requested_selection (dbin);
1323         } else
1324           SELECTION_UNLOCK (dbin);
1325       }
1326
1327       /* If we are waiting to create an identity passthrough, do it now */
1328       if (!input->parsebin && !input->identity)
1329         setup_identify_for_input (dbin, input, sinkpad);
1330       break;
1331     }
1332     case GST_EVENT_CAPS:
1333     {
1334       GstCaps *newcaps = NULL;
1335
1336       gst_event_parse_caps (event, &newcaps);
1337       if (!newcaps)
1338         break;
1339       GST_DEBUG_OBJECT (sinkpad, "new caps %" GST_PTR_FORMAT, newcaps);
1340
1341       /* No parsebin or identity present, check if we can avoid creating one */
1342       if (!input->parsebin && !input->identity) {
1343         if (is_parsebin_required_for_input (dbin, input, newcaps, sinkpad)) {
1344           GST_DEBUG_OBJECT (sinkpad, "parsebin is required for input");
1345           ensure_input_parsebin (dbin, input);
1346           break;
1347         }
1348         GST_DEBUG_OBJECT (sinkpad,
1349             "parsebin not required. Will create identity passthrough element once we get the collection");
1350         break;
1351       }
1352
1353       if (input->identity) {
1354         if (is_parsebin_required_for_input (dbin, input, newcaps, sinkpad)) {
1355           GST_ERROR_OBJECT (sinkpad,
1356               "Switching from passthrough to parsebin on inputs is not supported !");
1357           gst_event_unref (event);
1358           return FALSE;
1359         }
1360         /* Nothing else to do here */
1361         break;
1362       }
1363
1364       /* Check if the parsebin present can handle the new caps */
1365       g_assert (input->parsebin);
1366       GST_DEBUG_OBJECT (sinkpad,
1367           "New caps, checking if they are compatible with existing parsebin");
1368       if (!gst_pad_query_accept_caps (input->parsebin_sink, newcaps)) {
1369         GST_DEBUG_OBJECT (sinkpad,
1370             "Parsebin doesn't accept the new caps %" GST_PTR_FORMAT, newcaps);
1371         /* Reset parsebin so that it reconfigures itself for the new stream format */
1372         INPUT_LOCK (dbin);
1373         reset_input_parsebin (dbin, input);
1374         INPUT_UNLOCK (dbin);
1375       } else {
1376         GST_DEBUG_OBJECT (sinkpad, "Parsebin accepts new caps");
1377       }
1378       break;
1379     }
1380     default:
1381       break;
1382   }
1383
1384   /* Chain to parent function */
1385   return gst_pad_event_default (sinkpad, GST_OBJECT (dbin), event);
1386 }
1387
1388 /* Call with INPUT_LOCK taken */
1389 static DecodebinInput *
1390 create_new_input (GstDecodebin3 * dbin, gboolean main)
1391 {
1392   DecodebinInput *input;
1393
1394   input = g_new0 (DecodebinInput, 1);
1395   input->dbin = dbin;
1396   input->is_main = main;
1397   input->group_id = GST_GROUP_ID_INVALID;
1398   if (main)
1399     input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
1400   else {
1401     gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
1402     input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
1403     g_free (pad_name);
1404   }
1405   input->upstream_selected = FALSE;
1406   g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
1407   gst_pad_set_event_function (input->ghost_sink,
1408       (GstPadEventFunction) sink_event_function);
1409   gst_pad_set_query_function (input->ghost_sink,
1410       (GstPadQueryFunction) sink_query_function);
1411   gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
1412   g_signal_connect (input->ghost_sink, "unlinked",
1413       (GCallback) gst_decodebin3_input_pad_unlink, input);
1414
1415   gst_pad_set_active (input->ghost_sink, TRUE);
1416   gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
1417
1418   return input;
1419
1420 }
1421
1422 static GstPad *
1423 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
1424     const gchar * name, const GstCaps * caps)
1425 {
1426   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1427   DecodebinInput *input;
1428   GstPad *res = NULL;
1429
1430   /* We are ignoring names for the time being, not sure it makes any sense
1431    * within the context of decodebin3 ... */
1432   input = create_new_input (dbin, FALSE);
1433   if (input) {
1434     INPUT_LOCK (dbin);
1435     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1436     res = input->ghost_sink;
1437     INPUT_UNLOCK (dbin);
1438   }
1439
1440   return res;
1441 }
1442
1443 /* Must be called with factories lock! */
1444 static void
1445 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1446 {
1447   guint cookie;
1448
1449   cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1450   if (!dbin->factories || dbin->factories_cookie != cookie) {
1451     GList *tmp;
1452     if (dbin->factories)
1453       gst_plugin_feature_list_free (dbin->factories);
1454     if (dbin->decoder_factories)
1455       g_list_free (dbin->decoder_factories);
1456     if (dbin->decodable_factories)
1457       g_list_free (dbin->decodable_factories);
1458     dbin->factories =
1459         gst_element_factory_list_get_elements
1460         (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1461     dbin->factories =
1462         g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1463     dbin->factories_cookie = cookie;
1464
1465     /* Filter decoder and other decodables */
1466     dbin->decoder_factories = NULL;
1467     dbin->decodable_factories = NULL;
1468     for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1469       GstElementFactory *fact = (GstElementFactory *) tmp->data;
1470       if (gst_element_factory_list_is_type (fact,
1471               GST_ELEMENT_FACTORY_TYPE_DECODER))
1472         dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1473       else
1474         dbin->decodable_factories =
1475             g_list_append (dbin->decodable_factories, fact);
1476     }
1477   }
1478 }
1479
1480 /* Must be called with appropriate lock if list is a protected variable */
1481 static const gchar *
1482 stream_in_list (GList * list, const gchar * sid)
1483 {
1484   GList *tmp;
1485
1486 #if EXTRA_DEBUG
1487   for (tmp = list; tmp; tmp = tmp->next) {
1488     gchar *osid = (gchar *) tmp->data;
1489     GST_DEBUG ("Checking %s against %s", sid, osid);
1490   }
1491 #endif
1492
1493   for (tmp = list; tmp; tmp = tmp->next) {
1494     const gchar *osid = (gchar *) tmp->data;
1495     if (!g_strcmp0 (sid, osid))
1496       return osid;
1497   }
1498
1499   return NULL;
1500 }
1501
1502 static gboolean
1503 stream_list_equal (GList * lista, GList * listb)
1504 {
1505   GList *tmp;
1506
1507   if (g_list_length (lista) != g_list_length (listb))
1508     return FALSE;
1509
1510   for (tmp = lista; tmp; tmp = tmp->next) {
1511     gchar *osid = tmp->data;
1512     if (!stream_in_list (listb, osid))
1513       return FALSE;
1514   }
1515
1516   return TRUE;
1517 }
1518
1519 static void
1520 update_requested_selection (GstDecodebin3 * dbin)
1521 {
1522   guint i, nb;
1523   GList *tmp = NULL;
1524   gboolean all_user_selected = TRUE;
1525   GstStreamType used_types = 0;
1526   GstStreamCollection *collection;
1527
1528   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1529    *  the switch handler will take care of the pending selection */
1530   SELECTION_LOCK (dbin);
1531   if (dbin->pending_select_streams) {
1532     GST_DEBUG_OBJECT (dbin,
1533         "No need to create pending selection, SELECT_STREAMS underway");
1534     goto beach;
1535   }
1536
1537   collection = dbin->collection;
1538   if (G_UNLIKELY (collection == NULL)) {
1539     GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1540     goto beach;
1541   }
1542   nb = gst_stream_collection_get_size (collection);
1543
1544   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1545   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1546
1547   /* 3. If not, check if we already have some of the streams in the
1548    * existing active/requested selection */
1549   for (i = 0; i < nb; i++) {
1550     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1551     const gchar *sid = gst_stream_get_stream_id (stream);
1552     gint request = -1;
1553     /* Fire select-stream signal to see if outside components want to
1554      * hint at which streams should be selected */
1555     g_signal_emit (G_OBJECT (dbin),
1556         gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1557         &request);
1558     GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1559
1560     if (request == -1)
1561       all_user_selected = FALSE;
1562     if (request == 1 || (request == -1
1563             && (stream_in_list (dbin->requested_selection, sid)
1564                 || stream_in_list (dbin->active_selection, sid)))) {
1565       GstStreamType curtype = gst_stream_get_stream_type (stream);
1566       if (request == 1)
1567         GST_DEBUG_OBJECT (dbin,
1568             "Using stream requested by 'select-stream' signal : %s", sid);
1569       else
1570         GST_DEBUG_OBJECT (dbin,
1571             "Re-using stream already present in requested or active selection : %s",
1572             sid);
1573       tmp = g_list_append (tmp, (gchar *) sid);
1574       used_types |= curtype;
1575     }
1576   }
1577
1578   /* 4. If the user didn't explicitly selected all streams, match one stream of each type */
1579   if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) {
1580     for (i = 0; i < nb; i++) {
1581       GstStream *stream = gst_stream_collection_get_stream (collection, i);
1582       GstStreamType curtype = gst_stream_get_stream_type (stream);
1583       if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) {
1584         const gchar *sid = gst_stream_get_stream_id (stream);
1585         GST_DEBUG_OBJECT (dbin,
1586             "Automatically selecting stream '%s' of type %s", sid,
1587             gst_stream_type_get_name (curtype));
1588         tmp = g_list_append (tmp, (gchar *) sid);
1589         used_types |= curtype;
1590       }
1591     }
1592   }
1593
1594 beach:
1595   if (stream_list_equal (tmp, dbin->requested_selection)) {
1596     /* If the selection is equal, there is nothign to do */
1597     GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection");
1598     g_list_free (tmp);
1599     tmp = NULL;
1600   }
1601
1602   if (tmp) {
1603     /* Finally set the requested selection */
1604     if (dbin->requested_selection) {
1605       GST_FIXME_OBJECT (dbin,
1606           "Replacing non-NULL requested_selection, what should we do ??");
1607       g_list_free_full (dbin->requested_selection, g_free);
1608     }
1609     dbin->requested_selection =
1610         g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1611     dbin->selection_updated = TRUE;
1612     g_list_free (tmp);
1613   }
1614   SELECTION_UNLOCK (dbin);
1615 }
1616
1617 /* sort_streams:
1618  * GCompareFunc to use with lists of GstStream.
1619  * Sorts GstStreams by stream type and SELECT flag and stream-id
1620  * First video, then audio, then others.
1621  *
1622  * Return: negative if a<b, 0 if a==b, positive if a>b
1623  */
1624 static gint
1625 sort_streams (GstStream * sa, GstStream * sb)
1626 {
1627   GstStreamType typea, typeb;
1628   GstStreamFlags flaga, flagb;
1629   const gchar *ida, *idb;
1630   gint ret = 0;
1631
1632   typea = gst_stream_get_stream_type (sa);
1633   typeb = gst_stream_get_stream_type (sb);
1634
1635   GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1636       gst_stream_get_stream_id (sb));
1637
1638   /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1639   if (typea != typeb) {
1640     if (typea & GST_STREAM_TYPE_VIDEO)
1641       ret = -1;
1642     else if (typea & GST_STREAM_TYPE_AUDIO)
1643       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1644     else if (typea & GST_STREAM_TYPE_TEXT)
1645       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1646           && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1647     else if (typea & GST_STREAM_TYPE_CONTAINER)
1648       ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1649     else
1650       ret = 1;
1651
1652     if (ret != 0) {
1653       GST_LOG ("Sort by stream-type: %d", ret);
1654       return ret;
1655     }
1656   }
1657
1658   /* Sort by SELECT flag, if stream type is same. */
1659   flaga = gst_stream_get_stream_flags (sa);
1660   flagb = gst_stream_get_stream_flags (sb);
1661
1662   ret =
1663       (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1664       -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1665
1666   if (ret != 0) {
1667     GST_LOG ("Sort by SELECT flag: %d", ret);
1668     return ret;
1669   }
1670
1671   /* Sort by stream-id, if otherwise the same. */
1672   ida = gst_stream_get_stream_id (sa);
1673   idb = gst_stream_get_stream_id (sb);
1674   ret = g_strcmp0 (ida, idb);
1675
1676   GST_LOG ("Sort by stream-id: %d", ret);
1677
1678   return ret;
1679 }
1680
1681 /* Call with INPUT_LOCK taken */
1682 static GstStreamCollection *
1683 get_merged_collection (GstDecodebin3 * dbin)
1684 {
1685   gboolean needs_merge = FALSE;
1686   GstStreamCollection *res = NULL;
1687   GList *tmp;
1688   GList *unsorted_streams = NULL;
1689   guint i, nb_stream;
1690
1691   /* First check if we need to do a merge or just return the only collection */
1692   res = dbin->main_input->collection;
1693
1694   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1695     DecodebinInput *input = (DecodebinInput *) tmp->data;
1696     GST_LOG_OBJECT (dbin, "Comparing res %p input->collection %p", res,
1697         input->collection);
1698     if (input->collection && input->collection != res) {
1699       if (res) {
1700         needs_merge = TRUE;
1701         break;
1702       }
1703       res = input->collection;
1704     }
1705   }
1706
1707   if (!needs_merge) {
1708     GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1709     return res ? gst_object_ref (res) : NULL;
1710   }
1711
1712   /* We really need to create a new collection */
1713   /* FIXME : Some numbering scheme maybe ?? */
1714   res = gst_stream_collection_new ("decodebin3");
1715   if (dbin->main_input->collection) {
1716     nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1717     GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1718     for (i = 0; i < nb_stream; i++) {
1719       GstStream *stream =
1720           gst_stream_collection_get_stream (dbin->main_input->collection, i);
1721       unsorted_streams = g_list_append (unsorted_streams, stream);
1722     }
1723   }
1724
1725   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1726     DecodebinInput *input = (DecodebinInput *) tmp->data;
1727     GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1728         input->collection);
1729     if (input->collection) {
1730       nb_stream = gst_stream_collection_get_size (input->collection);
1731       GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1732       for (i = 0; i < nb_stream; i++) {
1733         GstStream *stream =
1734             gst_stream_collection_get_stream (input->collection, i);
1735         /* Only add if not already present in the list */
1736         if (!g_list_find (unsorted_streams, stream))
1737           unsorted_streams = g_list_append (unsorted_streams, stream);
1738       }
1739     }
1740   }
1741
1742   /* re-order streams : video, then audio, then others */
1743   unsorted_streams =
1744       g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1745   for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1746     GstStream *stream = (GstStream *) tmp->data;
1747     GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1748         gst_stream_get_stream_id (stream));
1749     gst_stream_collection_add_stream (res, gst_object_ref (stream));
1750   }
1751
1752   if (unsorted_streams)
1753     g_list_free (unsorted_streams);
1754
1755   return res;
1756 }
1757
1758 /* Call with INPUT_LOCK taken */
1759 static DecodebinInput *
1760 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1761 {
1762   DecodebinInput *input = NULL;
1763   GstElement *parent = gst_object_ref (child);
1764   GList *tmp;
1765
1766   do {
1767     GstElement *next_parent;
1768
1769     GST_DEBUG_OBJECT (dbin, "parent %s",
1770         parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1771
1772     if (parent == dbin->main_input->parsebin) {
1773       input = dbin->main_input;
1774       break;
1775     }
1776     for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1777       DecodebinInput *cur = (DecodebinInput *) tmp->data;
1778       if (parent == cur->parsebin) {
1779         input = cur;
1780         break;
1781       }
1782     }
1783     next_parent = (GstElement *) gst_element_get_parent (parent);
1784     gst_object_unref (parent);
1785     parent = next_parent;
1786
1787   } while (parent && parent != (GstElement *) dbin);
1788
1789   if (parent)
1790     gst_object_unref (parent);
1791
1792   return input;
1793 }
1794
1795 static const gchar *
1796 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1797 {
1798   guint i, len;
1799
1800   if (dbin->collection == NULL)
1801     return NULL;
1802   len = gst_stream_collection_get_size (dbin->collection);
1803   for (i = 0; i < len; i++) {
1804     GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1805     const gchar *osid = gst_stream_get_stream_id (stream);
1806     if (!g_strcmp0 (sid, osid))
1807       return osid;
1808   }
1809
1810   return NULL;
1811 }
1812
1813 /* Call with INPUT_LOCK taken */
1814 static void
1815 handle_stream_collection (GstDecodebin3 * dbin,
1816     GstStreamCollection * collection, DecodebinInput * input)
1817 {
1818 #ifndef GST_DISABLE_GST_DEBUG
1819   const gchar *upstream_id;
1820   guint i;
1821 #endif
1822   if (!input) {
1823     GST_DEBUG_OBJECT (dbin,
1824         "Couldn't find corresponding input, most likely shutting down");
1825     return;
1826   }
1827
1828   /* Replace collection in input */
1829   if (input->collection)
1830     gst_object_unref (input->collection);
1831   input->collection = gst_object_ref (collection);
1832   GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1833       input);
1834
1835   /* Merge collection if needed */
1836   collection = get_merged_collection (dbin);
1837
1838 #ifndef GST_DISABLE_GST_DEBUG
1839   /* Just some debugging */
1840   upstream_id = gst_stream_collection_get_upstream_id (collection);
1841   GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1842   GST_DEBUG ("From input %p", input);
1843   GST_DEBUG ("  %d streams", gst_stream_collection_get_size (collection));
1844   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1845     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1846     GstTagList *taglist;
1847     GstCaps *caps;
1848
1849     GST_DEBUG ("   Stream '%s'", gst_stream_get_stream_id (stream));
1850     GST_DEBUG ("     type  : %s",
1851         gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1852     GST_DEBUG ("     flags : 0x%x", gst_stream_get_stream_flags (stream));
1853     taglist = gst_stream_get_tags (stream);
1854     GST_DEBUG ("     tags  : %" GST_PTR_FORMAT, taglist);
1855     caps = gst_stream_get_caps (stream);
1856     GST_DEBUG ("     caps  : %" GST_PTR_FORMAT, caps);
1857     if (taglist)
1858       gst_tag_list_unref (taglist);
1859     if (caps)
1860       gst_caps_unref (caps);
1861   }
1862 #endif
1863
1864   /* Store collection for later usage */
1865   SELECTION_LOCK (dbin);
1866   if (dbin->collection == NULL) {
1867     dbin->collection = collection;
1868   } else {
1869     /* We need to check who emitted this collection (the owner).
1870      * If we already had a collection from that user, this one is an update,
1871      * that is to say that we need to figure out how we are going to re-use
1872      * the streams/slot */
1873     GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1874     /* FIXME : When do we switch from pending collection to active collection ?
1875      * When all streams from active collection are drained in multiqueue output ? */
1876     gst_object_unref (dbin->collection);
1877     dbin->collection = collection;
1878   }
1879   dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1880   SELECTION_UNLOCK (dbin);
1881 }
1882
1883 /* Must be called with the selection lock taken */
1884 static void
1885 gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
1886 {
1887   GstClockTime max_latency = GST_CLOCK_TIME_NONE;
1888   GList *tmp;
1889
1890   GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
1891   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1892     DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1893     if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
1894       if (max_latency == GST_CLOCK_TIME_NONE
1895           || out->decoder_latency > max_latency)
1896         max_latency = out->decoder_latency;
1897     }
1898   }
1899   GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
1900       GST_TIME_ARGS (max_latency));
1901
1902   if (!GST_CLOCK_TIME_IS_VALID (max_latency))
1903     return;
1904
1905   /* Make sure we keep an extra overhead */
1906   max_latency += 100 * GST_MSECOND;
1907   if (max_latency == dbin->current_mq_min_interleave)
1908     return;
1909
1910   dbin->current_mq_min_interleave = max_latency;
1911   GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
1912       GST_TIME_ARGS (dbin->current_mq_min_interleave));
1913   g_object_set (dbin->multiqueue, "min-interleave-time",
1914       dbin->current_mq_min_interleave, NULL);
1915 }
1916
1917 static void
1918 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1919 {
1920   GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1921   gboolean posting_collection = FALSE;
1922
1923   GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1924
1925   switch (GST_MESSAGE_TYPE (message)) {
1926     case GST_MESSAGE_STREAM_COLLECTION:
1927     {
1928       GstStreamCollection *collection = NULL;
1929       DecodebinInput *input;
1930
1931       INPUT_LOCK (dbin);
1932       input =
1933           find_message_parsebin (dbin,
1934           (GstElement *) GST_MESSAGE_SRC (message));
1935       if (input == NULL) {
1936         GST_DEBUG_OBJECT (dbin,
1937             "Couldn't find corresponding input, most likely shutting down");
1938         INPUT_UNLOCK (dbin);
1939         break;
1940       }
1941       if (input->upstream_selected) {
1942         GST_DEBUG_OBJECT (dbin,
1943             "Upstream handles selection, not using/forwarding collection");
1944         INPUT_UNLOCK (dbin);
1945         goto drop_message;
1946       }
1947       gst_message_parse_stream_collection (message, &collection);
1948       if (collection) {
1949         handle_stream_collection (dbin, collection, input);
1950         posting_collection = TRUE;
1951       }
1952       INPUT_UNLOCK (dbin);
1953
1954       SELECTION_LOCK (dbin);
1955       if (dbin->collection) {
1956         /* Replace collection message, we most likely aggregated it */
1957         GstMessage *new_msg;
1958         new_msg =
1959             gst_message_new_stream_collection ((GstObject *) dbin,
1960             dbin->collection);
1961         gst_message_unref (message);
1962         message = new_msg;
1963       }
1964       SELECTION_UNLOCK (dbin);
1965
1966       if (collection)
1967         gst_object_unref (collection);
1968       break;
1969     }
1970     case GST_MESSAGE_LATENCY:
1971     {
1972       GList *tmp;
1973       /* Check if this is from one of our decoders */
1974       SELECTION_LOCK (dbin);
1975       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1976         DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1977         if (out->decoder == (GstElement *) GST_MESSAGE_SRC (message)) {
1978           GstClockTime min, max;
1979           if (GST_IS_VIDEO_DECODER (out->decoder)) {
1980             gst_video_decoder_get_latency (GST_VIDEO_DECODER (out->decoder),
1981                 &min, &max);
1982             GST_DEBUG_OBJECT (dbin,
1983                 "Got latency update from one of our decoders. min: %"
1984                 GST_TIME_FORMAT " max: %" GST_TIME_FORMAT, GST_TIME_ARGS (min),
1985                 GST_TIME_ARGS (max));
1986             out->decoder_latency = min;
1987             /* Trigger recalculation */
1988             gst_decodebin3_update_min_interleave (dbin);
1989           }
1990           break;
1991         }
1992       }
1993       SELECTION_UNLOCK (dbin);
1994     }
1995     default:
1996       break;
1997   }
1998
1999   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2000
2001   if (posting_collection) {
2002     /* Figure out a selection for that collection */
2003     update_requested_selection (dbin);
2004   }
2005
2006   return;
2007
2008 drop_message:
2009   {
2010     GST_DEBUG_OBJECT (bin, "dropping message");
2011     gst_message_unref (message);
2012   }
2013 }
2014
2015 static DecodebinOutputStream *
2016 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
2017 {
2018   GList *tmp;
2019   GstStreamType stype = gst_stream_get_stream_type (stream);
2020
2021   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2022     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2023     if (output->type == stype && output->slot && output->slot->active_stream) {
2024       GstStream *tstream = output->slot->active_stream;
2025       if (!stream_in_list (dbin->requested_selection,
2026               (gchar *) gst_stream_get_stream_id (tstream))) {
2027         return output;
2028       }
2029     }
2030   }
2031
2032   return NULL;
2033 }
2034
2035 /* Give a certain slot, figure out if it should be linked to an
2036  * output stream
2037  * CALL WITH SELECTION LOCK TAKEN !*/
2038 static DecodebinOutputStream *
2039 get_output_for_slot (MultiQueueSlot * slot)
2040 {
2041   GstDecodebin3 *dbin = slot->dbin;
2042   DecodebinOutputStream *output = NULL;
2043   const gchar *stream_id;
2044   GstCaps *caps;
2045   gchar *id_in_list = NULL;
2046
2047   /* If we already have a configured output, just use it */
2048   if (slot->output != NULL)
2049     return slot->output;
2050
2051   /*
2052    * FIXME
2053    *
2054    * This method needs to be split into multiple parts
2055    *
2056    * 1) Figure out whether stream should be exposed or not
2057    *   This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
2058    *   in the default stream attribution
2059    *
2060    * 2) Figure out whether an output stream should be created, whether
2061    *   we can re-use the output stream already linked to the slot, or
2062    *   whether we need to get re-assigned another (currently used) output
2063    *   stream.
2064    */
2065
2066   stream_id = gst_stream_get_stream_id (slot->active_stream);
2067   caps = gst_stream_get_caps (slot->active_stream);
2068   GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
2069   gst_caps_unref (caps);
2070
2071   /* 0. Emit autoplug-continue signal for pending caps ? */
2072   GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
2073
2074   /* 1. if in EXPOSE_ALL_MODE, just accept */
2075   GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
2076
2077 #if 0
2078   /* FIXME : The idea around this was to avoid activating a stream for
2079    *     which we have no decoder. Unfortunately it is way too
2080    *     expensive. Need to figure out a better solution */
2081   /* 2. Is there a potential decoder (if one is required) */
2082   if (!gst_caps_can_intersect (caps, dbin->caps)
2083       && !have_factory (dbin, (GstCaps *) caps,
2084           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
2085     GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
2086         caps);
2087     SELECTION_UNLOCK (dbin);
2088     gst_element_post_message (GST_ELEMENT_CAST (dbin),
2089         gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2090     SELECTION_LOCK (dbin);
2091     return NULL;
2092   }
2093 #endif
2094
2095   /* 3. In default mode check if we should expose */
2096   id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
2097   if (id_in_list || dbin->upstream_selected) {
2098     /* Check if we can steal an existing output stream we could re-use.
2099      * that is:
2100      * * an output stream whose slot->stream is not in requested
2101      * * and is of the same type as this stream
2102      */
2103     output = find_free_compatible_output (dbin, slot->active_stream);
2104     if (output) {
2105       /* Move this output from its current slot to this slot */
2106       dbin->to_activate =
2107           g_list_append (dbin->to_activate, (gchar *) stream_id);
2108       dbin->requested_selection =
2109           g_list_remove (dbin->requested_selection, id_in_list);
2110       g_free (id_in_list);
2111       SELECTION_UNLOCK (dbin);
2112       gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2113           (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
2114       SELECTION_LOCK (dbin);
2115       return NULL;
2116     }
2117
2118     output = create_output_stream (dbin, slot->type);
2119     output->slot = slot;
2120     GST_DEBUG ("Linking slot %p to new output %p", slot, output);
2121     slot->output = output;
2122     GST_DEBUG ("Adding '%s' to active_selection", stream_id);
2123     dbin->active_selection =
2124         g_list_append (dbin->active_selection, (gchar *) g_strdup (stream_id));
2125   } else
2126     GST_DEBUG ("Not creating any output for slot %p", slot);
2127
2128   return output;
2129 }
2130
2131 /* Returns SELECTED_STREAMS message if active_selection is equal to
2132  * requested_selection, else NULL.
2133  * Must be called with LOCK taken */
2134 static GstMessage *
2135 is_selection_done (GstDecodebin3 * dbin)
2136 {
2137   GList *tmp;
2138   GstMessage *msg;
2139
2140   if (!dbin->selection_updated)
2141     return NULL;
2142
2143   GST_LOG_OBJECT (dbin, "Checking");
2144
2145   if (dbin->to_activate != NULL) {
2146     GST_DEBUG ("Still have streams to activate");
2147     return NULL;
2148   }
2149   for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
2150     GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
2151     if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
2152       GST_DEBUG ("Not in active selection, returning");
2153       return NULL;
2154     }
2155   }
2156
2157   GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
2158
2159   /* We are completely active */
2160   msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
2161   if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) {
2162     gst_message_set_seqnum (msg, dbin->select_streams_seqnum);
2163   }
2164   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2165     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2166     if (output->slot) {
2167       const gchar *output_streamid =
2168           gst_stream_get_stream_id (output->slot->active_stream);
2169       GST_DEBUG_OBJECT (dbin, "Adding stream %s", output_streamid);
2170       if (stream_in_list (dbin->requested_selection, output_streamid))
2171         gst_message_streams_selected_add (msg, output->slot->active_stream);
2172       else
2173         GST_WARNING_OBJECT (dbin,
2174             "Output slot still active for old selection ?");
2175     } else
2176       GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
2177   }
2178   dbin->selection_updated = FALSE;
2179   return msg;
2180 }
2181
2182 /* Must be called with SELECTION_LOCK taken */
2183 static void
2184 check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev)
2185 {
2186   gboolean all_drained = TRUE;
2187   GList *iter;
2188
2189   GST_DEBUG_OBJECT (dbin, "check slot for eos");
2190
2191   for (iter = dbin->slots; iter; iter = iter->next) {
2192     MultiQueueSlot *slot = iter->data;
2193
2194     if (!slot->output)
2195       continue;
2196
2197     if (slot->is_drained) {
2198       GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
2199       continue;
2200     }
2201
2202     all_drained = FALSE;
2203     break;
2204   }
2205
2206   /* Also check with the inputs, data might be pending */
2207   if (all_drained)
2208     all_drained = all_inputs_are_eos (dbin);
2209
2210   if (all_drained) {
2211     GST_DEBUG_OBJECT (dbin,
2212         "All active slots are drained, and no pending input, push EOS");
2213
2214     for (iter = dbin->input_streams; iter; iter = iter->next) {
2215       DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
2216       GstPad *peer = gst_pad_get_peer (input->srcpad);
2217
2218       /* Send EOS to all slots */
2219       if (peer) {
2220         GstEvent *stream_start, *eos;
2221
2222         stream_start =
2223             gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
2224
2225         /* First forward a custom STREAM_START event to reset the EOS status (if any) */
2226         if (stream_start) {
2227           GstStructure *s;
2228           GstEvent *custom_stream_start = gst_event_copy (stream_start);
2229           gst_event_unref (stream_start);
2230           s = (GstStructure *) gst_event_get_structure (custom_stream_start);
2231           gst_structure_set (s, "decodebin3-flushing-stream-start",
2232               G_TYPE_BOOLEAN, TRUE, NULL);
2233           gst_pad_send_event (peer, custom_stream_start);
2234         }
2235
2236         eos = gst_event_new_eos ();
2237         gst_event_set_seqnum (eos, gst_event_get_seqnum (ev));
2238         gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
2239             CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
2240             NULL);
2241         gst_pad_send_event (peer, eos);
2242         gst_object_unref (peer);
2243       } else
2244         GST_DEBUG_OBJECT (dbin, "no output");
2245     }
2246   }
2247 }
2248
2249 static GstPadProbeReturn
2250 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
2251     MultiQueueSlot * slot)
2252 {
2253   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2254   GstDecodebin3 *dbin = slot->dbin;
2255
2256   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
2257     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2258
2259     GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
2260     switch (GST_EVENT_TYPE (ev)) {
2261       case GST_EVENT_STREAM_START:
2262       {
2263         GstStream *stream = NULL;
2264         const GstStructure *s = gst_event_get_structure (ev);
2265
2266         /* Drop STREAM_START events used to cleanup multiqueue */
2267         if (s
2268             && gst_structure_has_field (s,
2269                 "decodebin3-flushing-stream-start")) {
2270           ret = GST_PAD_PROBE_HANDLED;
2271           gst_event_unref (ev);
2272           break;
2273         }
2274
2275         gst_event_parse_stream (ev, &stream);
2276         if (stream == NULL) {
2277           GST_ERROR_OBJECT (pad,
2278               "Got a STREAM_START event without a GstStream");
2279           break;
2280         }
2281         slot->is_drained = FALSE;
2282         GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
2283             gst_stream_get_stream_id (stream));
2284         if (slot->active_stream == NULL) {
2285           slot->active_stream = stream;
2286         } else if (slot->active_stream != stream) {
2287           GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
2288               gst_stream_get_stream_id (slot->active_stream),
2289               gst_stream_get_stream_id (stream));
2290           gst_object_unref (slot->active_stream);
2291           slot->active_stream = stream;
2292         } else
2293           gst_object_unref (stream);
2294 #if 0                           /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
2295         {
2296           gboolean is_active, is_requested;
2297           /* Quick check to see if we're in the current selection */
2298           /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
2299           SELECTION_LOCK (dbin);
2300           GST_DEBUG_OBJECT (dbin, "Checking active selection");
2301           is_active = stream_in_list (dbin->active_selection, stream_id);
2302           GST_DEBUG_OBJECT (dbin, "Checking requested selection");
2303           is_requested = stream_in_list (dbin->requested_selection, stream_id);
2304           SELECTION_UNLOCK (dbin);
2305           if (is_active)
2306             GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
2307                 slot->output);
2308           if (is_requested)
2309             GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
2310                 slot->output);
2311           else if (slot->output) {
2312             GST_DEBUG_OBJECT (pad,
2313                 "Slot needs to be deactivated ? It's no longer in requested selection");
2314           } else if (!is_active)
2315             GST_DEBUG_OBJECT (pad,
2316                 "Slot in neither active nor requested selection");
2317         }
2318 #endif
2319       }
2320         break;
2321       case GST_EVENT_CAPS:
2322       {
2323         /* Configure the output slot if needed */
2324         DecodebinOutputStream *output;
2325         GstMessage *msg = NULL;
2326         SELECTION_LOCK (dbin);
2327         output = get_output_for_slot (slot);
2328         if (output) {
2329           reconfigure_output_stream (output, slot);
2330           msg = is_selection_done (dbin);
2331         }
2332         SELECTION_UNLOCK (dbin);
2333         if (msg)
2334           gst_element_post_message ((GstElement *) slot->dbin, msg);
2335       }
2336         break;
2337       case GST_EVENT_EOS:
2338       {
2339         gboolean was_drained = slot->is_drained;
2340         slot->is_drained = TRUE;
2341
2342         /* Custom EOS handling first */
2343         if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2344                 CUSTOM_EOS_QUARK)) {
2345           /* remove custom-eos */
2346           ev = gst_event_make_writable (ev);
2347           GST_PAD_PROBE_INFO_DATA (info) = ev;
2348           gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
2349               CUSTOM_EOS_QUARK, NULL, NULL);
2350
2351           GST_LOG_OBJECT (pad, "Received custom EOS");
2352           ret = GST_PAD_PROBE_HANDLED;
2353           SELECTION_LOCK (dbin);
2354           if (slot->input == NULL) {
2355             GST_DEBUG_OBJECT (pad,
2356                 "Got custom-eos from null input stream, remove output stream");
2357             /* Remove the output */
2358             if (slot->output) {
2359               DecodebinOutputStream *output = slot->output;
2360               dbin->output_streams =
2361                   g_list_remove (dbin->output_streams, output);
2362               free_output_stream (dbin, output);
2363               /* Reacalculate min interleave */
2364               gst_decodebin3_update_min_interleave (dbin);
2365             }
2366             slot->probe_id = 0;
2367             dbin->slots = g_list_remove (dbin->slots, slot);
2368             free_multiqueue_slot_async (dbin, slot);
2369             ret = GST_PAD_PROBE_REMOVE;
2370           } else if (!was_drained) {
2371             check_all_slot_for_eos (dbin, ev);
2372           }
2373           if (ret == GST_PAD_PROBE_HANDLED)
2374             gst_event_unref (ev);
2375           SELECTION_UNLOCK (dbin);
2376           break;
2377         }
2378
2379         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
2380             slot->input);
2381         if (slot->input == NULL) {
2382           GstPad *peer;
2383           GST_DEBUG_OBJECT (pad,
2384               "last EOS for input, forwarding and removing slot");
2385           peer = gst_pad_get_peer (pad);
2386           if (peer) {
2387             gst_pad_send_event (peer, gst_event_ref (ev));
2388             gst_object_unref (peer);
2389           }
2390           SELECTION_LOCK (dbin);
2391           /* FIXME : Shouldn't we try to re-assign the output instead of just
2392            * removing it ? */
2393           /* Remove the output */
2394           if (slot->output) {
2395             DecodebinOutputStream *output = slot->output;
2396             dbin->output_streams = g_list_remove (dbin->output_streams, output);
2397             free_output_stream (dbin, output);
2398           }
2399           slot->probe_id = 0;
2400           dbin->slots = g_list_remove (dbin->slots, slot);
2401           SELECTION_UNLOCK (dbin);
2402
2403           /* FIXME: Removing the slot is async, which means actually
2404            * unlinking the pad is async. Other things like stream-start
2405            * might flow through this (now unprobed) link before it actually
2406            * gets released */
2407           free_multiqueue_slot_async (dbin, slot);
2408           ret = GST_PAD_PROBE_REMOVE;
2409         } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2410                 CUSTOM_FINAL_EOS_QUARK)) {
2411           GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
2412         } else {
2413           GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
2414           /* drop current event as eos will be sent in check_all_slot_for_eos
2415            * when all output streams are also eos */
2416           ret = GST_PAD_PROBE_DROP;
2417           SELECTION_LOCK (dbin);
2418           check_all_slot_for_eos (dbin, ev);
2419           SELECTION_UNLOCK (dbin);
2420         }
2421       }
2422         break;
2423       default:
2424         break;
2425     }
2426   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
2427     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
2428     switch (GST_QUERY_TYPE (query)) {
2429       case GST_QUERY_CAPS:
2430       {
2431         GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
2432         gst_query_set_caps_result (query, GST_CAPS_ANY);
2433         ret = GST_PAD_PROBE_HANDLED;
2434       }
2435         break;
2436
2437       case GST_QUERY_ACCEPT_CAPS:
2438       {
2439         GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
2440         /* If the current decoder doesn't accept caps, we'll reconfigure
2441          * on the actual caps event. So accept any caps. */
2442         gst_query_set_accept_caps_result (query, TRUE);
2443         ret = GST_PAD_PROBE_HANDLED;
2444       }
2445       default:
2446         break;
2447     }
2448   }
2449
2450   return ret;
2451 }
2452
2453 /* Create a new multiqueue slot for the given type
2454  *
2455  * It is up to the caller to know whether that slot is needed or not
2456  * (and release it when no longer needed) */
2457 static MultiQueueSlot *
2458 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
2459 {
2460   MultiQueueSlot *slot;
2461   GstIterator *it = NULL;
2462   GValue item = { 0, };
2463
2464   GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
2465       gst_stream_type_get_name (type));
2466   slot = g_new0 (MultiQueueSlot, 1);
2467   slot->dbin = dbin;
2468
2469   slot->id = dbin->slot_id++;
2470
2471   slot->type = type;
2472   slot->sink_pad = gst_element_request_pad_simple (dbin->multiqueue, "sink_%u");
2473   if (slot->sink_pad == NULL)
2474     goto fail;
2475
2476   it = gst_pad_iterate_internal_links (slot->sink_pad);
2477   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
2478       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
2479     GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
2480         GST_DEBUG_PAD_NAME (slot->src_pad));
2481     goto fail;
2482   }
2483   gst_iterator_free (it);
2484   g_value_reset (&item);
2485
2486   g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
2487
2488   /* Add event probe */
2489   slot->probe_id =
2490       gst_pad_add_probe (slot->src_pad,
2491       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2492       (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
2493
2494   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
2495       GST_DEBUG_PAD_NAME (slot->src_pad));
2496
2497   dbin->slots = g_list_append (dbin->slots, slot);
2498
2499   return slot;
2500
2501   /* ERRORS */
2502 fail:
2503   {
2504     if (slot->sink_pad)
2505       gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2506     g_free (slot);
2507     return NULL;
2508   }
2509 }
2510
2511 /* Must be called with SELECTION_LOCK */
2512 static MultiQueueSlot *
2513 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
2514 {
2515   GList *tmp;
2516   MultiQueueSlot *empty_slot = NULL;
2517   GstStreamType input_type = 0;
2518   gchar *stream_id = NULL;
2519
2520   GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
2521       input, input->active_stream,
2522       input->
2523       active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
2524
2525   if (input->active_stream) {
2526     input_type = gst_stream_get_stream_type (input->active_stream);
2527     stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
2528   }
2529
2530   /* Go over existing slots and check if there is already one for it */
2531   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2532     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2533     /* Already used input, return that one */
2534     if (slot->input == input) {
2535       GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
2536       return slot;
2537     }
2538   }
2539
2540   /* Go amongst all unused slots of the right type and try to find a candidate */
2541   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2542     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2543     if (slot->input == NULL && input_type == slot->type) {
2544       /* Remember this empty slot for later */
2545       empty_slot = slot;
2546       /* Check if available slot is of the same stream_id */
2547       GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2548           slot->id, slot->active_stream);
2549       if (stream_id && slot->active_stream) {
2550         gchar *ostream_id =
2551             (gchar *) gst_stream_get_stream_id (slot->active_stream);
2552         GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2553             ostream_id, stream_id);
2554         if (!g_strcmp0 (stream_id, ostream_id))
2555           break;
2556       }
2557     }
2558   }
2559
2560   if (empty_slot) {
2561     GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2562     empty_slot->input = input;
2563     return empty_slot;
2564   }
2565
2566   if (input_type)
2567     return create_new_slot (dbin, input_type);
2568
2569   return NULL;
2570 }
2571
2572 static void
2573 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2574 {
2575   if (slot->input != NULL && slot->input != input) {
2576     GST_ERROR_OBJECT (slot->dbin,
2577         "Trying to link input to an already used slot");
2578     return;
2579   }
2580   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2581   slot->pending_stream = input->active_stream;
2582   slot->input = input;
2583 }
2584
2585 #if 0
2586 static gboolean
2587 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2588     GstElementFactoryListType ftype)
2589 {
2590   gboolean ret = FALSE;
2591   GList *res;
2592
2593   g_mutex_lock (&dbin->factories_lock);
2594   gst_decode_bin_update_factories_list (dbin);
2595   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2596     res =
2597         gst_element_factory_list_filter (dbin->decoder_factories,
2598         caps, GST_PAD_SINK, TRUE);
2599   else
2600     res =
2601         gst_element_factory_list_filter (dbin->decodable_factories,
2602         caps, GST_PAD_SINK, TRUE);
2603   g_mutex_unlock (&dbin->factories_lock);
2604
2605   if (res) {
2606     ret = TRUE;
2607     gst_plugin_feature_list_free (res);
2608   }
2609
2610   return ret;
2611 }
2612 #endif
2613
2614 static GList *
2615 create_decoder_factory_list (GstDecodebin3 * dbin, GstCaps * caps)
2616 {
2617   GList *res;
2618
2619   g_mutex_lock (&dbin->factories_lock);
2620   gst_decode_bin_update_factories_list (dbin);
2621   res = gst_element_factory_list_filter (dbin->decoder_factories,
2622       caps, GST_PAD_SINK, TRUE);
2623   g_mutex_unlock (&dbin->factories_lock);
2624   return res;
2625 }
2626
2627 static GstPadProbeReturn
2628 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2629     DecodebinOutputStream * output)
2630 {
2631   GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2632
2633   /* If we have a keyframe, remove the probe and let all data through */
2634   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2635       GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2636     GST_DEBUG_OBJECT (pad,
2637         "Buffer is keyframe or header, letting through and removing probe");
2638     output->drop_probe_id = 0;
2639     return GST_PAD_PROBE_REMOVE;
2640   }
2641   GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2642   return GST_PAD_PROBE_DROP;
2643 }
2644
2645 static void
2646 reconfigure_output_stream (DecodebinOutputStream * output,
2647     MultiQueueSlot * slot)
2648 {
2649   GstDecodebin3 *dbin = output->dbin;
2650   GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2651   gboolean needs_decoder;
2652
2653   needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2654
2655   GST_DEBUG_OBJECT (dbin,
2656       "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2657       needs_decoder);
2658
2659   /* FIXME : Maybe make the output un-hook itself automatically ? */
2660   if (output->slot != NULL && output->slot != slot) {
2661     GST_WARNING_OBJECT (dbin,
2662         "Output still linked to another slot (%p)", output->slot);
2663     gst_caps_unref (new_caps);
2664     return;
2665   }
2666
2667   /* Check if existing config is reusable as-is by checking if
2668    * the existing decoder accepts the new caps, if not delete
2669    * it and create a new one */
2670   if (output->decoder) {
2671     gboolean can_reuse_decoder;
2672
2673     if (needs_decoder) {
2674       can_reuse_decoder =
2675           gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2676     } else
2677       can_reuse_decoder = FALSE;
2678
2679     if (can_reuse_decoder) {
2680       if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2681         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2682         output->drop_probe_id =
2683             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2684             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2685       }
2686       GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2687       if (output->linked == FALSE) {
2688         gst_pad_link_full (slot->src_pad, output->decoder_sink,
2689             GST_PAD_LINK_CHECK_NOTHING);
2690         output->linked = TRUE;
2691       }
2692       gst_caps_unref (new_caps);
2693       return;
2694     }
2695
2696     GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2697
2698     if (output->linked)
2699       gst_pad_unlink (slot->src_pad, output->decoder_sink);
2700     output->linked = FALSE;
2701     if (output->drop_probe_id) {
2702       gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2703       output->drop_probe_id = 0;
2704     }
2705
2706     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2707       GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2708       gst_caps_unref (new_caps);
2709       goto cleanup;
2710     }
2711
2712     gst_element_set_locked_state (output->decoder, TRUE);
2713     gst_element_set_state (output->decoder, GST_STATE_NULL);
2714
2715     gst_bin_remove ((GstBin *) dbin, output->decoder);
2716     output->decoder = NULL;
2717     output->decoder_latency = GST_CLOCK_TIME_NONE;
2718   } else if (output->linked) {
2719     /* Otherwise if we have no decoder yet but the output is linked make
2720      * sure that the ghost pad is really unlinked in case no decoder was
2721      * needed previously */
2722     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2723       GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
2724       gst_caps_unref (new_caps);
2725       goto cleanup;
2726     }
2727   }
2728
2729   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2730   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2731
2732   /* If a decoder is required, create one */
2733   if (needs_decoder) {
2734     GList *factories, *next_factory;
2735
2736     factories = next_factory = create_decoder_factory_list (dbin, new_caps);
2737     while (!output->decoder) {
2738       gboolean decoder_failed = FALSE;
2739
2740       /* If we don't have a decoder yet, instantiate one */
2741       if (next_factory) {
2742         output->decoder = gst_element_factory_create ((GstElementFactory *)
2743             next_factory->data, NULL);
2744         GST_DEBUG ("Created decoder '%s'", GST_ELEMENT_NAME (output->decoder));
2745       } else
2746         GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
2747             new_caps);
2748
2749       if (output->decoder == NULL) {
2750         GstCaps *caps;
2751
2752         SELECTION_UNLOCK (dbin);
2753         /* FIXME : Should we be smarter if there's a missing decoder ?
2754          * Should we deactivate that stream ? */
2755         caps = gst_stream_get_caps (slot->active_stream);
2756         gst_element_post_message (GST_ELEMENT_CAST (dbin),
2757             gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2758         gst_caps_unref (caps);
2759         SELECTION_LOCK (dbin);
2760         goto cleanup;
2761       }
2762       if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2763         GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2764         goto cleanup;
2765       }
2766       output->decoder_sink =
2767           gst_element_get_static_pad (output->decoder, "sink");
2768       output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2769       if (output->type & GST_STREAM_TYPE_VIDEO) {
2770         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2771         output->drop_probe_id =
2772             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2773             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2774       }
2775       if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2776               GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2777         GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2778             GST_DEBUG_PAD_NAME (output->decoder_sink));
2779         goto cleanup;
2780       }
2781       if (gst_element_set_state (output->decoder,
2782               GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
2783         GST_DEBUG_OBJECT (dbin,
2784             "Decoder '%s' failed to reach READY state, trying the next type",
2785             GST_ELEMENT_NAME (output->decoder));
2786         decoder_failed = TRUE;
2787       }
2788       if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
2789         GST_DEBUG_OBJECT (dbin,
2790             "Decoder '%s' did not accept the caps, trying the next type",
2791             GST_ELEMENT_NAME (output->decoder));
2792         decoder_failed = TRUE;
2793       }
2794       if (decoder_failed) {
2795         gst_pad_unlink (slot->src_pad, output->decoder_sink);
2796         if (output->drop_probe_id) {
2797           gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2798           output->drop_probe_id = 0;
2799         }
2800
2801         gst_element_set_locked_state (output->decoder, TRUE);
2802         gst_element_set_state (output->decoder, GST_STATE_NULL);
2803
2804         gst_bin_remove ((GstBin *) dbin, output->decoder);
2805         output->decoder = NULL;
2806       }
2807       next_factory = next_factory->next;
2808     }
2809     gst_plugin_feature_list_free (factories);
2810   } else {
2811     output->decoder_src = gst_object_ref (slot->src_pad);
2812     output->decoder_sink = NULL;
2813   }
2814   gst_caps_unref (new_caps);
2815
2816   output->linked = TRUE;
2817   if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2818           output->decoder_src)) {
2819     GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2820     goto cleanup;
2821   }
2822   if (output->src_exposed == FALSE) {
2823     GstEvent *stream_start;
2824
2825     stream_start = gst_pad_get_sticky_event (slot->src_pad,
2826         GST_EVENT_STREAM_START, 0);
2827
2828     /* Ensure GstStream is accesiable from pad-added callback */
2829     if (stream_start) {
2830       gst_pad_store_sticky_event (output->src_pad, stream_start);
2831       gst_event_unref (stream_start);
2832     } else {
2833       GST_WARNING_OBJECT (slot->src_pad,
2834           "Pad has no stored stream-start event");
2835     }
2836
2837     output->src_exposed = TRUE;
2838     gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2839   }
2840
2841   if (output->decoder)
2842     gst_element_sync_state_with_parent (output->decoder);
2843
2844   output->slot = slot;
2845   return;
2846
2847 cleanup:
2848   {
2849     GST_DEBUG_OBJECT (dbin, "Cleanup");
2850     if (output->decoder_sink) {
2851       gst_object_unref (output->decoder_sink);
2852       output->decoder_sink = NULL;
2853     }
2854     if (output->decoder_src) {
2855       gst_object_unref (output->decoder_src);
2856       output->decoder_src = NULL;
2857     }
2858     if (output->decoder) {
2859       gst_element_set_state (output->decoder, GST_STATE_NULL);
2860       gst_bin_remove ((GstBin *) dbin, output->decoder);
2861       output->decoder = NULL;
2862     }
2863   }
2864 }
2865
2866 static GstPadProbeReturn
2867 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2868 {
2869   GstMessage *msg = NULL;
2870   DecodebinOutputStream *output;
2871
2872   SELECTION_LOCK (slot->dbin);
2873   output = get_output_for_slot (slot);
2874
2875   GST_DEBUG_OBJECT (pad, "output : %p", output);
2876
2877   if (output) {
2878     reconfigure_output_stream (output, slot);
2879     msg = is_selection_done (slot->dbin);
2880   }
2881   SELECTION_UNLOCK (slot->dbin);
2882   if (msg)
2883     gst_element_post_message ((GstElement *) slot->dbin, msg);
2884
2885   return GST_PAD_PROBE_REMOVE;
2886 }
2887
2888 static MultiQueueSlot *
2889 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2890 {
2891   GList *tmp;
2892
2893   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2894     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2895     const gchar *stream_id;
2896     if (slot->active_stream) {
2897       stream_id = gst_stream_get_stream_id (slot->active_stream);
2898       if (!g_strcmp0 (sid, stream_id))
2899         return slot;
2900     }
2901     if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2902       stream_id = gst_stream_get_stream_id (slot->pending_stream);
2903       if (!g_strcmp0 (sid, stream_id))
2904         return slot;
2905     }
2906   }
2907
2908   return NULL;
2909 }
2910
2911 /* This function handles the reassignment of a slot. Call this from
2912  * the streaming thread of a slot. */
2913 static gboolean
2914 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2915 {
2916   DecodebinOutputStream *output;
2917   MultiQueueSlot *target_slot = NULL;
2918   GList *tmp;
2919   const gchar *sid, *tsid;
2920
2921   SELECTION_LOCK (dbin);
2922   output = slot->output;
2923
2924   if (G_UNLIKELY (slot->active_stream == NULL)) {
2925     GST_DEBUG_OBJECT (slot->src_pad,
2926         "Called on inactive slot (active_stream == NULL)");
2927     SELECTION_UNLOCK (dbin);
2928     return FALSE;
2929   }
2930
2931   if (G_UNLIKELY (output == NULL)) {
2932     GST_DEBUG_OBJECT (slot->src_pad,
2933         "Slot doesn't have any output to be removed");
2934     SELECTION_UNLOCK (dbin);
2935     return FALSE;
2936   }
2937
2938   sid = gst_stream_get_stream_id (slot->active_stream);
2939   GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2940
2941   /* Recheck whether this stream is still in the list of streams to deactivate */
2942   if (stream_in_list (dbin->requested_selection, sid)) {
2943     /* Stream is in the list of requested streams, don't remove */
2944     SELECTION_UNLOCK (dbin);
2945     GST_DEBUG_OBJECT (slot->src_pad,
2946         "Stream '%s' doesn't need to be deactivated", sid);
2947     return FALSE;
2948   }
2949
2950   /* Unlink slot from output */
2951   /* FIXME : Handle flushing ? */
2952   /* FIXME : Handle outputs without decoders */
2953   GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2954       output->decoder_sink);
2955   if (output->decoder_sink)
2956     gst_pad_unlink (slot->src_pad, output->decoder_sink);
2957   output->linked = FALSE;
2958   slot->output = NULL;
2959   output->slot = NULL;
2960   /* Remove sid from active selection */
2961   GST_DEBUG ("Removing '%s' from active_selection", sid);
2962   for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2963     if (!g_strcmp0 (sid, tmp->data)) {
2964       g_free (tmp->data);
2965       dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2966       break;
2967     }
2968
2969   /* Can we re-assign this output to a requested stream ? */
2970   GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2971   for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2972     MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2973     GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2974         tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2975     if (tslot && tslot->type == output->type && tslot->output == NULL) {
2976       GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2977       target_slot = tslot;
2978       tsid = tmp->data;
2979       /* Pass target stream id to requested selection */
2980       dbin->requested_selection =
2981           g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2982       dbin->to_activate = g_list_delete_link (dbin->to_activate, tmp);
2983       break;
2984     }
2985   }
2986
2987   if (target_slot) {
2988     GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2989         target_slot, tsid);
2990     target_slot->output = output;
2991     output->slot = target_slot;
2992     GST_DEBUG ("Adding '%s' to active_selection", tsid);
2993     dbin->active_selection =
2994         g_list_append (dbin->active_selection, (gchar *) g_strdup (tsid));
2995     SELECTION_UNLOCK (dbin);
2996
2997     /* Wakeup the target slot so that it retries to send events/buffers
2998      * thereby triggering the output reconfiguration codepath */
2999     gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3000         (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
3001     /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
3002   } else {
3003     GstMessage *msg;
3004
3005     dbin->output_streams = g_list_remove (dbin->output_streams, output);
3006     free_output_stream (dbin, output);
3007     msg = is_selection_done (slot->dbin);
3008     SELECTION_UNLOCK (dbin);
3009
3010     if (msg)
3011       gst_element_post_message ((GstElement *) slot->dbin, msg);
3012   }
3013
3014   return TRUE;
3015 }
3016
3017 /* Idle probe called when a slot should be unassigned from its output stream.
3018  * This is needed to ensure nothing is flowing when unlinking the slot.
3019  *
3020  * Also, this method will search for a pending stream which could re-use
3021  * the output stream. */
3022 static GstPadProbeReturn
3023 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
3024     MultiQueueSlot * slot)
3025 {
3026   GstDecodebin3 *dbin = slot->dbin;
3027
3028   reassign_slot (dbin, slot);
3029
3030   return GST_PAD_PROBE_REMOVE;
3031 }
3032
3033 static gboolean
3034 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
3035     guint32 seqnum)
3036 {
3037   gboolean ret = TRUE;
3038   GList *tmp;
3039   /* List of slots to (de)activate. */
3040   GList *to_deactivate = NULL;
3041   GList *to_activate = NULL;
3042   /* List of unknown stream id, most likely means the event
3043    * should be sent upstream so that elements can expose the requested stream */
3044   GList *unknown = NULL;
3045   GList *to_reassign = NULL;
3046   GList *future_request_streams = NULL;
3047   GList *pending_streams = NULL;
3048   GList *slots_to_reassign = NULL;
3049
3050   SELECTION_LOCK (dbin);
3051   if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
3052     GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
3053     SELECTION_UNLOCK (dbin);
3054     return TRUE;
3055   }
3056   /* Remove pending select_streams */
3057   g_list_free (dbin->pending_select_streams);
3058   dbin->pending_select_streams = NULL;
3059
3060   /* COMPARE the requested streams to the active and requested streams
3061    * on multiqueue. */
3062
3063   /* First check the slots to activate and which ones are unknown */
3064   for (tmp = select_streams; tmp; tmp = tmp->next) {
3065     const gchar *sid = (const gchar *) tmp->data;
3066     MultiQueueSlot *slot;
3067     GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
3068     slot = find_slot_for_stream_id (dbin, sid);
3069     /* Find the corresponding slot */
3070     if (slot == NULL) {
3071       if (stream_in_collection (dbin, (gchar *) sid)) {
3072         pending_streams = g_list_append (pending_streams, (gchar *) sid);
3073       } else {
3074         GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
3075         unknown = g_list_append (unknown, (gchar *) sid);
3076       }
3077     } else if (slot->output == NULL) {
3078       GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
3079           slot, sid);
3080       to_activate = g_list_append (to_activate, slot);
3081     } else {
3082       GST_DEBUG_OBJECT (dbin,
3083           "Stream '%s' from slot %p is already active on output %p", sid, slot,
3084           slot->output);
3085       future_request_streams =
3086           g_list_append (future_request_streams, (gchar *) sid);
3087     }
3088   }
3089
3090   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3091     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3092     /* For slots that have an output, check if it's part of the streams to
3093      * be active */
3094     if (slot->output) {
3095       gboolean slot_to_deactivate = TRUE;
3096
3097       if (slot->active_stream) {
3098         if (stream_in_list (select_streams,
3099                 gst_stream_get_stream_id (slot->active_stream)))
3100           slot_to_deactivate = FALSE;
3101       }
3102       if (slot_to_deactivate && slot->pending_stream
3103           && slot->pending_stream != slot->active_stream) {
3104         if (stream_in_list (select_streams,
3105                 gst_stream_get_stream_id (slot->pending_stream)))
3106           slot_to_deactivate = FALSE;
3107       }
3108       if (slot_to_deactivate) {
3109         GST_DEBUG_OBJECT (dbin,
3110             "Slot %p (%s) should be deactivated, no longer used", slot,
3111             slot->
3112             active_stream ? gst_stream_get_stream_id (slot->active_stream) :
3113             "NULL");
3114         to_deactivate = g_list_append (to_deactivate, slot);
3115       }
3116     }
3117   }
3118
3119   if (to_deactivate != NULL) {
3120     GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
3121     /* We need to compare what needs to be activated and deactivated in order
3122      * to determine whether there are outputs that can be transferred */
3123     /* Take the stream-id of the slots that are to be activated, for which there
3124      * is a slot of the same type that needs to be deactivated */
3125     tmp = to_deactivate;
3126     while (tmp) {
3127       MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
3128       gboolean removeit = FALSE;
3129       GList *tmp2, *next;
3130       GST_DEBUG_OBJECT (dbin,
3131           "Checking if slot to deactivate (%p) has a candidate slot to activate",
3132           slot_to_deactivate);
3133       for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
3134         MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
3135         GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
3136         if (slot_to_activate->type == slot_to_deactivate->type) {
3137           GST_DEBUG_OBJECT (dbin, "Re-using");
3138           to_reassign = g_list_append (to_reassign, (gchar *)
3139               gst_stream_get_stream_id (slot_to_activate->active_stream));
3140           slots_to_reassign =
3141               g_list_append (slots_to_reassign, slot_to_deactivate);
3142           to_activate = g_list_remove (to_activate, slot_to_activate);
3143           removeit = TRUE;
3144           break;
3145         }
3146       }
3147       next = tmp->next;
3148       if (removeit)
3149         to_deactivate = g_list_delete_link (to_deactivate, tmp);
3150       tmp = next;
3151     }
3152   }
3153
3154   for (tmp = to_deactivate; tmp; tmp = tmp->next) {
3155     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3156     GST_DEBUG_OBJECT (dbin,
3157         "Really need to deactivate slot %p, but no available alternative",
3158         slot);
3159
3160     slots_to_reassign = g_list_append (slots_to_reassign, slot);
3161   }
3162
3163   /* The only slots left to activate are the ones that won't be reassigned and
3164    * therefore really need to have a new output created */
3165   for (tmp = to_activate; tmp; tmp = tmp->next) {
3166     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3167     if (slot->active_stream)
3168       future_request_streams =
3169           g_list_append (future_request_streams,
3170           (gchar *) gst_stream_get_stream_id (slot->active_stream));
3171     else if (slot->pending_stream)
3172       future_request_streams =
3173           g_list_append (future_request_streams,
3174           (gchar *) gst_stream_get_stream_id (slot->pending_stream));
3175     else
3176       GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
3177   }
3178
3179   if (to_activate == NULL && pending_streams != NULL) {
3180     GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
3181     if (dbin->requested_selection)
3182       g_list_free_full (dbin->requested_selection, g_free);
3183     dbin->requested_selection =
3184         g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
3185     g_list_free (to_deactivate);
3186     g_list_free (pending_streams);
3187     to_deactivate = NULL;
3188     pending_streams = NULL;
3189   } else {
3190     if (dbin->requested_selection)
3191       g_list_free_full (dbin->requested_selection, g_free);
3192     dbin->requested_selection =
3193         g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
3194     dbin->requested_selection =
3195         g_list_concat (dbin->requested_selection,
3196         g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
3197     if (dbin->to_activate)
3198       g_list_free (dbin->to_activate);
3199     dbin->to_activate = g_list_copy (to_reassign);
3200   }
3201
3202   dbin->selection_updated = TRUE;
3203   SELECTION_UNLOCK (dbin);
3204
3205   if (unknown) {
3206     GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
3207     g_list_free (unknown);
3208   }
3209
3210   if (to_activate && !slots_to_reassign) {
3211     for (tmp = to_activate; tmp; tmp = tmp->next) {
3212       MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3213       gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3214           (GstPadProbeCallback) idle_reconfigure, slot, NULL);
3215     }
3216   }
3217
3218   /* For all streams to deactivate, add an idle probe where we will do
3219    * the unassignment and switch over */
3220   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
3221     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3222     gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3223         (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
3224   }
3225
3226   if (to_deactivate)
3227     g_list_free (to_deactivate);
3228   if (to_activate)
3229     g_list_free (to_activate);
3230   if (to_reassign)
3231     g_list_free (to_reassign);
3232   if (future_request_streams)
3233     g_list_free (future_request_streams);
3234   if (pending_streams)
3235     g_list_free (pending_streams);
3236   if (slots_to_reassign)
3237     g_list_free (slots_to_reassign);
3238
3239   return ret;
3240 }
3241
3242 static GstPadProbeReturn
3243 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
3244     DecodebinOutputStream * output)
3245 {
3246   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
3247   GstDecodebin3 *dbin = output->dbin;
3248   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
3249
3250   GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
3251
3252   switch (GST_EVENT_TYPE (event)) {
3253     case GST_EVENT_SELECT_STREAMS:
3254     {
3255       GstPad *peer;
3256       GList *streams = NULL;
3257       guint32 seqnum = gst_event_get_seqnum (event);
3258
3259       if (dbin->upstream_selected) {
3260         GST_DEBUG_OBJECT (pad, "Letting select-streams event flow upstream");
3261         break;
3262       }
3263
3264       SELECTION_LOCK (dbin);
3265       if (seqnum == dbin->select_streams_seqnum) {
3266         SELECTION_UNLOCK (dbin);
3267         GST_DEBUG_OBJECT (pad,
3268             "Already handled/handling that SELECT_STREAMS event");
3269         gst_event_unref (event);
3270         ret = GST_PAD_PROBE_HANDLED;
3271         break;
3272       }
3273       dbin->select_streams_seqnum = seqnum;
3274       if (dbin->pending_select_streams != NULL) {
3275         GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3276         g_list_free (dbin->pending_select_streams);
3277         dbin->pending_select_streams = NULL;
3278       }
3279       gst_event_parse_select_streams (event, &streams);
3280       dbin->pending_select_streams = g_list_copy (streams);
3281       SELECTION_UNLOCK (dbin);
3282
3283       /* Send event upstream */
3284       if ((peer = gst_pad_get_peer (pad))) {
3285         gst_pad_send_event (peer, event);
3286         gst_object_unref (peer);
3287       } else {
3288         gst_event_unref (event);
3289       }
3290       /* Finally handle the switch */
3291       if (streams) {
3292         handle_stream_switch (dbin, streams, seqnum);
3293         g_list_free_full (streams, g_free);
3294       }
3295       ret = GST_PAD_PROBE_HANDLED;
3296     }
3297       break;
3298     default:
3299       break;
3300   }
3301
3302   return ret;
3303 }
3304
3305 static gboolean
3306 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
3307 {
3308   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3309
3310   GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
3311   if (!dbin->upstream_selected
3312       && GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
3313     GList *streams = NULL;
3314     guint32 seqnum = gst_event_get_seqnum (event);
3315
3316     SELECTION_LOCK (dbin);
3317     if (seqnum == dbin->select_streams_seqnum) {
3318       SELECTION_UNLOCK (dbin);
3319       GST_DEBUG_OBJECT (dbin,
3320           "Already handled/handling that SELECT_STREAMS event");
3321       return TRUE;
3322     }
3323     dbin->select_streams_seqnum = seqnum;
3324     if (dbin->pending_select_streams != NULL) {
3325       GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3326       g_list_free (dbin->pending_select_streams);
3327       dbin->pending_select_streams = NULL;
3328     }
3329     gst_event_parse_select_streams (event, &streams);
3330     dbin->pending_select_streams = g_list_copy (streams);
3331     SELECTION_UNLOCK (dbin);
3332
3333 #if 0
3334     /* Send event upstream */
3335     if ((peer = gst_pad_get_peer (pad))) {
3336       gst_pad_send_event (peer, event);
3337       gst_object_unref (peer);
3338     }
3339 #endif
3340     /* Finally handle the switch */
3341     if (streams) {
3342       handle_stream_switch (dbin, streams, seqnum);
3343       g_list_free_full (streams, g_free);
3344     }
3345
3346     gst_event_unref (event);
3347     return TRUE;
3348   }
3349   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
3350 }
3351
3352
3353 static void
3354 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3355 {
3356   if (slot->probe_id)
3357     gst_pad_remove_probe (slot->src_pad, slot->probe_id);
3358   if (slot->input) {
3359     if (slot->input->srcpad)
3360       gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
3361   }
3362
3363   gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
3364   gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
3365   gst_object_replace ((GstObject **) & slot->src_pad, NULL);
3366   gst_object_replace ((GstObject **) & slot->active_stream, NULL);
3367   g_free (slot);
3368 }
3369
3370 static void
3371 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3372 {
3373   GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
3374   gst_element_call_async (GST_ELEMENT_CAST (dbin),
3375       (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
3376 }
3377
3378 /* Create a DecodebinOutputStream for a given type
3379  * Note: It will be empty initially, it needs to be configured
3380  * afterwards */
3381 static DecodebinOutputStream *
3382 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
3383 {
3384   DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
3385   gchar *pad_name;
3386   const gchar *prefix;
3387   GstStaticPadTemplate *templ;
3388   GstPadTemplate *ptmpl;
3389   guint32 *counter;
3390   GstPad *internal_pad;
3391
3392   GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
3393       res, gst_stream_type_get_name (type));
3394
3395   res->type = type;
3396   res->dbin = dbin;
3397   res->decoder_latency = GST_CLOCK_TIME_NONE;
3398
3399   if (type & GST_STREAM_TYPE_VIDEO) {
3400     templ = &video_src_template;
3401     counter = &dbin->vpadcount;
3402     prefix = "video";
3403   } else if (type & GST_STREAM_TYPE_AUDIO) {
3404     templ = &audio_src_template;
3405     counter = &dbin->apadcount;
3406     prefix = "audio";
3407   } else if (type & GST_STREAM_TYPE_TEXT) {
3408     templ = &text_src_template;
3409     counter = &dbin->tpadcount;
3410     prefix = "text";
3411   } else {
3412     templ = &src_template;
3413     counter = &dbin->opadcount;
3414     prefix = "src";
3415   }
3416
3417   pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
3418   *counter += 1;
3419   ptmpl = gst_static_pad_template_get (templ);
3420   res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
3421   gst_object_unref (ptmpl);
3422   g_free (pad_name);
3423   gst_pad_set_active (res->src_pad, TRUE);
3424   /* Put an event probe on the internal proxy pad to detect upstream
3425    * events */
3426   internal_pad =
3427       (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
3428   gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
3429       (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
3430   gst_object_unref (internal_pad);
3431
3432   dbin->output_streams = g_list_append (dbin->output_streams, res);
3433
3434   return res;
3435 }
3436
3437 static void
3438 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
3439 {
3440   if (output->slot) {
3441     if (output->decoder_sink && output->decoder)
3442       gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
3443
3444     output->slot->output = NULL;
3445     output->slot = NULL;
3446   }
3447   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
3448   gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
3449   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
3450   if (output->src_exposed) {
3451     gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
3452   }
3453   if (output->decoder) {
3454     gst_element_set_locked_state (output->decoder, TRUE);
3455     gst_element_set_state (output->decoder, GST_STATE_NULL);
3456     gst_bin_remove ((GstBin *) dbin, output->decoder);
3457   }
3458   g_free (output);
3459 }
3460
3461 static GstStateChangeReturn
3462 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
3463 {
3464   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3465   GstStateChangeReturn ret;
3466
3467   /* Upwards */
3468   switch (transition) {
3469     default:
3470       break;
3471   }
3472   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3473   if (ret == GST_STATE_CHANGE_FAILURE)
3474     goto beach;
3475
3476   switch (transition) {
3477     case GST_STATE_CHANGE_PAUSED_TO_READY:
3478       gst_decodebin3_reset (dbin);
3479       break;
3480     default:
3481       break;
3482   }
3483 beach:
3484   return ret;
3485 }