decodebin3: Update stream-collection with _input_pad_unlink()
[platform/upstream/gstreamer.git] / gst / playback / gstdecodebin3.c
1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32
33 #include "gstplayback.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36
37 /**
38  * SECTION:element-decodebin3
39  *
40  * #GstBin that auto-magically constructs a decoding pipeline using available
41  * decoders and demuxers via auto-plugging. The output is raw audio, video
42  * or subtitle streams.
43  *
44  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
45  *
46  * <itemizedlist>
47  * <listitem>
48  * supports publication and selection of stream information via
49  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAM events.
50  * </listitem>
51  * <listitem>
52  * dynamically switches stream connections internally, and
53  * reuses decoder elements when stream selections change, so that in
54  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
55  * and only creates new elements when streams change and an existing decoder
56  * is not capable of handling the new format.
57  * </listitem>
58  * <listitem>
59  * supports multiple input pads for the parallel decoding of auxilliary streams
60  * not muxed with the primary stream.
61  * </listitem>
62  * <listitem>
63  * does not handle network stream buffering. decodebin3 expects that network stream
64  * buffering is handled upstream, before data is passed to it.
65  * </listitem>
66  * </itemizedlist>
67  *
68  * <emphasis>decodebin3 is still experimental API and a technology preview.
69  * Its behaviour and exposed API is subject to change.</emphasis>
70  *
71  */
72
73 /**
74  * Global design
75  *
76  * 1) From sink pad to elementary streams (GstParseBin)
77  *
78  * The input sink pads are fed to GstParseBin. GstParseBin will feed them
79  * through typefind. When the caps are detected (or changed) we recursively
80  * figure out which demuxer, parser or depayloader is needed until we get to
81  * elementary streams.
82  *
83  * All elementary streams (whether decoded or not, whether exposed or not) are
84  * fed through multiqueue. There is only *one* multiqueue in decodebin3.
85  *
86  * => MultiQueue is the cornerstone.
87  * => No buffering before multiqueue
88  *
89  * 2) Elementary streams
90  *
91  * After GstParseBin, there are 3 main components:
92  *  1) Input Streams (provided by GstParseBin)
93  *  2) Multiqueue slots
94  *  3) Output Streams
95  *
96  * Input Streams correspond to the stream coming from GstParseBin and that gets
97  * fed into a multiqueue slot.
98  *
99  * Output Streams correspond to the combination of a (optional) decoder and an
100  * output ghostpad. Output Streams can be moved from one multiqueue slot to
101  * another, can reconfigure itself (different decoders), and can be
102  * added/removed depending on the configuration (all streams outputted, only one
103  * of each type, ...).
104  *
105  * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
106  * each 'active' Input Stream there is a corresponding slot.
107  * Slots might have different streams on input and output (due to internal
108  * buffering).
109  *
110  * Due to internal queuing/buffering/..., all those components (might) behave
111  * asynchronously. Therefore probes will be used on each component source pad to
112  * detect various key-points:
113  *  * EOS :
114  *     the stream is done => Mark that component as done, optionally freeing/removing it
115  *  * STREAM_START :
116  *     a new stream is starting => link it further if needed
117  *
118  *
119  * 3) Gradual replacement
120  *
121  * If the caps change at any point in decodebin (input sink pad, demuxer output,
122  * multiqueue output, ..), we gradually replace (if needed) the following elements.
123  *
124  * This is handled by the probes in various locations:
125  *  a) typefind output
126  *  b) multiqueue input (source pad of Input Streams)
127  *  c) multiqueue output (source pad of Multiqueue Slots)
128  *  d) final output (target of source ghostpads)
129  *
130  * When CAPS event arrive at those points, one of three things can happen:
131  * a) There is no elements downstream yet, just create/link-to following elements
132  * b) There are downstream elements, do a ACCEPT_CAPS query
133  *  b.1) The new CAPS are accepted, keep current configuration
134  *  b.2) The new CAPS are not accepted, remove following elements then do a)
135  *
136  *
137  *
138  *    Components:
139  *
140  *                                                   MultiQ     Output
141  *                     Input(s)                      Slots      Streams
142  *  /-------------------------------------------\   /-----\  /------------- \
143  *
144  * +-------------------------------------------------------------------------+
145  * |                                                                         |
146  * | +---------------------------------------------+                         |
147  * | |   GstParseBin(s)                            |                         |
148  * | |                +--------------+             |  +-----+                |
149  * | |                |              |---[parser]-[|--| Mul |---[ decoder ]-[|
150  * |]--[ typefind ]---|  demuxer(s)  |------------[|  | ti  |                |
151  * | |                |  (if needed) |---[parser]-[|--| qu  |                |
152  * | |                |              |---[parser]-[|--| eu  |---[ decoder ]-[|
153  * | |                +--------------+             |  +------             ^  |
154  * | +---------------------------------------------+        ^             |  |
155  * |                                               ^        |             |  |
156  * +-----------------------------------------------+--------+-------------+--+
157  *                                                 |        |             |
158  *                                                 |        |             |
159  *                                       Probes  --/--------/-------------/
160  *
161  * ATOMIC SWITCHING
162  *
163  * We want to ensure we re-use decoders when switching streams. This takes place
164  * at the multiqueue output level.
165  *
166  * MAIN CONCEPTS
167  *  1) Activating a stream (i.e. linking a slot to an output) is only done within
168  *    the streaming thread in the multiqueue_src_probe() and only if the
169       stream is in the REQUESTED selection.
170  *  2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
171  *    within the stream thread, but only in a purposefully called IDLE probe
172  *    that calls reassign_slot().
173  *
174  * Based on those two principles, 3 "selection" of streams (stream-id) are used:
175  * 1) requested_selection
176  *    All streams within that list should be activated
177  * 2) active_selection
178  *    List of streams that are exposed by decodebin
179  * 3) to_activate
180  *    List of streams that will be moved to requested_selection in the
181  *    reassign_slot() method (i.e. once a stream was deactivated, and the output
182  *    was retargetted)
183  */
184
185
186 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
187 #define GST_CAT_DEFAULT decodebin3_debug
188
189 #define GST_TYPE_DECODEBIN3      (gst_decodebin3_get_type ())
190
191 #define EXTRA_DEBUG 1
192
193 typedef struct _GstDecodebin3 GstDecodebin3;
194 typedef struct _GstDecodebin3Class GstDecodebin3Class;
195
196 typedef struct _DecodebinInputStream DecodebinInputStream;
197 typedef struct _DecodebinInput DecodebinInput;
198 typedef struct _DecodebinOutputStream DecodebinOutputStream;
199
200 struct _GstDecodebin3
201 {
202   GstBin bin;
203
204   /* input_lock protects the following variables */
205   GMutex input_lock;
206   /* Main input (static sink pad) */
207   DecodebinInput *main_input;
208   /* Supplementary input (request sink pads) */
209   GList *other_inputs;
210   /* counter for input */
211   guint32 input_counter;
212   /* Current stream group_id (default : G_MAXUINT32) */
213   /* FIXME : Needs to be resetted appropriately (when upstream changes ?) */
214   guint32 current_group_id;
215   /* End of variables protected by input_lock */
216
217   GstElement *multiqueue;
218
219   /* FIXME : Mutex for protecting values below */
220   GstStreamCollection *collection;      /* Active collection */
221
222   GList *input_streams;         /* List of DecodebinInputStream for active collection */
223   GList *output_streams;        /* List of DecodebinOutputStream used for output */
224   GList *slots;                 /* List of MultiQueueSlot */
225   guint slot_id;
226
227   /* selection_lock protects access to following variables */
228   GMutex selection_lock;
229   /* requested selection of stream-id to activate post-multiqueue */
230   GList *requested_selection;
231   /* list of stream-id currently activated in output */
232   GList *active_selection;
233   /* List of stream-id that need to be activated (after a stream switch for ex) */
234   GList *to_activate;
235   /* Pending select streams event */
236   guint32 select_streams_seqnum;
237   /* pending list of streams to select (from downstream) */
238   GList *pending_select_streams;
239   /* TRUE if requested_selection was updated, will become FALSE once
240    * it has fully transitioned to active */
241   gboolean selection_updated;
242   /* End of variables protected by selection_lock */
243
244   /* List of pending collections.
245    * FIXME : Is this really needed ? */
246   GList *pending_collection;
247
248
249   /* Factories */
250   GMutex factories_lock;
251   guint32 factories_cookie;
252   /* All DECODABLE factories */
253   GList *factories;
254   /* Only DECODER factories */
255   GList *decoder_factories;
256   /* DECODABLE but not DECODER factories */
257   GList *decodable_factories;
258
259   /* counters for pads */
260   guint32 apadcount, vpadcount, tpadcount, opadcount;
261
262   /* Properties */
263   GstCaps *caps;
264 };
265
266 struct _GstDecodebin3Class
267 {
268   GstBinClass class;
269
270     gint (*select_stream) (GstDecodebin3 * dbin,
271       GstStreamCollection * collection, GstStream * stream);
272 };
273
274 /* Input of decodebin, controls input pad and parsebin */
275 struct _DecodebinInput
276 {
277   GstDecodebin3 *dbin;
278
279   gboolean is_main;
280
281   GstPad *ghost_sink;
282   GstPad *parsebin_sink;
283
284   GstStreamCollection *collection;      /* Active collection */
285
286   guint group_id;
287
288   GstElement *parsebin;
289
290   gulong pad_added_sigid;
291   gulong pad_removed_sigid;
292
293   /* HACK : Remove these fields */
294   /* List of PendingPad structures */
295   GList *pending_pads;
296 };
297
298 /* Multiqueue Slots */
299 typedef struct _MultiQueueSlot
300 {
301   guint id;
302
303   GstDecodebin3 *dbin;
304   /* Type of stream handled by this slot */
305   GstStreamType type;
306
307   /* Linked input and output */
308   DecodebinInputStream *input;
309
310   /* pending => last stream received on sink pad */
311   GstStream *pending_stream;
312   /* active => last stream outputted on source pad */
313   GstStream *active_stream;
314
315   GstPad *sink_pad, *src_pad;
316
317   /* id of the MQ src_pad event probe */
318   gulong probe_id;
319
320   gboolean drain_eos;
321
322   gboolean is_drained;
323
324   DecodebinOutputStream *output;
325 } MultiQueueSlot;
326
327 /* Streams that are exposed downstream (i.e. output) */
328 struct _DecodebinOutputStream
329 {
330   GstDecodebin3 *dbin;
331   /* The type of stream handled by this output stream */
332   GstStreamType type;
333
334   /* The slot to which this output stream is currently connected to */
335   MultiQueueSlot *slot;
336
337   GstElement *decoder;          /* Optional */
338   GstPad *decoder_sink, *decoder_src;
339   gboolean linked;
340
341   /* ghostpad */
342   GstPad *src_pad;
343   /* Flag if ghost pad is exposed */
344   gboolean src_exposed;
345
346   /* keyframe dropping probe */
347   gulong drop_probe_id;
348 };
349
350 /* Pending pads from parsebin */
351 typedef struct _PendingPad
352 {
353   GstDecodebin3 *dbin;
354   DecodebinInput *input;
355   GstPad *pad;
356
357   gulong buffer_probe;
358   gulong event_probe;
359   gboolean saw_eos;
360 } PendingPad;
361
362 /* properties */
363 #define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
364
365 enum
366 {
367   PROP_0,
368   PROP_CAPS
369 };
370
371 /* signals */
372 enum
373 {
374   SIGNAL_SELECT_STREAM,
375   LAST_SIGNAL
376 };
377 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
378
379 #define SELECTION_LOCK(dbin) G_STMT_START {                             \
380     GST_LOG_OBJECT (dbin,                                               \
381                     "selection locking from thread %p",                 \
382                     g_thread_self ());                                  \
383     g_mutex_lock (&dbin->selection_lock);                               \
384     GST_LOG_OBJECT (dbin,                                               \
385                     "selection locked from thread %p",                  \
386                     g_thread_self ());                                  \
387   } G_STMT_END
388
389 #define SELECTION_UNLOCK(dbin) G_STMT_START {                           \
390     GST_LOG_OBJECT (dbin,                                               \
391                     "selection unlocking from thread %p",               \
392                     g_thread_self ());                                  \
393     g_mutex_unlock (&dbin->selection_lock);                             \
394   } G_STMT_END
395
396 #define INPUT_LOCK(dbin) G_STMT_START {                         \
397     GST_LOG_OBJECT (dbin,                                               \
398                     "input locking from thread %p",                     \
399                     g_thread_self ());                                  \
400     g_mutex_lock (&dbin->input_lock);                           \
401     GST_LOG_OBJECT (dbin,                                               \
402                     "input locked from thread %p",                      \
403                     g_thread_self ());                                  \
404   } G_STMT_END
405
406 #define INPUT_UNLOCK(dbin) G_STMT_START {                               \
407     GST_LOG_OBJECT (dbin,                                               \
408                     "input unlocking from thread %p",           \
409                     g_thread_self ());                                  \
410     g_mutex_unlock (&dbin->input_lock);                         \
411   } G_STMT_END
412
413 GType gst_decodebin3_get_type (void);
414 #define gst_decodebin3_parent_class parent_class
415 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
416
417 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
418
419 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
420     GST_PAD_SINK,
421     GST_PAD_ALWAYS,
422     GST_STATIC_CAPS_ANY);
423
424 static GstStaticPadTemplate request_sink_template =
425 GST_STATIC_PAD_TEMPLATE ("sink_%u",
426     GST_PAD_SINK,
427     GST_PAD_REQUEST,
428     GST_STATIC_CAPS_ANY);
429
430 static GstStaticPadTemplate video_src_template =
431 GST_STATIC_PAD_TEMPLATE ("video_%u",
432     GST_PAD_SRC,
433     GST_PAD_SOMETIMES,
434     GST_STATIC_CAPS_ANY);
435
436 static GstStaticPadTemplate audio_src_template =
437 GST_STATIC_PAD_TEMPLATE ("audio_%u",
438     GST_PAD_SRC,
439     GST_PAD_SOMETIMES,
440     GST_STATIC_CAPS_ANY);
441
442 static GstStaticPadTemplate text_src_template =
443 GST_STATIC_PAD_TEMPLATE ("text_%u",
444     GST_PAD_SRC,
445     GST_PAD_SOMETIMES,
446     GST_STATIC_CAPS_ANY);
447
448 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
449     GST_PAD_SRC,
450     GST_PAD_SOMETIMES,
451     GST_STATIC_CAPS_ANY);
452
453
454 static void gst_decodebin3_dispose (GObject * object);
455 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
456     const GValue * value, GParamSpec * pspec);
457 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
458     GValue * value, GParamSpec * pspec);
459
460 static gboolean parsebin_autoplug_continue_cb (GstElement *
461     parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
462
463 static gint
464 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
465     GstStreamCollection * collection, GstStream * stream)
466 {
467   GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
468
469   return -1;
470 }
471
472 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
473     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
474 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
475 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
476     GstStateChange transition);
477 static gboolean gst_decodebin3_send_event (GstElement * element,
478     GstEvent * event);
479
480 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
481 #if 0
482 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
483     GstElementFactoryListType ftype);
484 #endif
485
486 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
487 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
488 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
489 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
490
491 static void reconfigure_output_stream (DecodebinOutputStream * output,
492     MultiQueueSlot * slot);
493 static void free_output_stream (GstDecodebin3 * dbin,
494     DecodebinOutputStream * output);
495 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
496     GstStreamType type);
497
498 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
499     GstPadProbeInfo * info, MultiQueueSlot * slot);
500 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
501 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
502     DecodebinInputStream * input);
503 static void link_input_to_slot (DecodebinInputStream * input,
504     MultiQueueSlot * slot);
505 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
506 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
507     MultiQueueSlot * slot);
508
509 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
510 static void update_requested_selection (GstDecodebin3 * dbin,
511     GstStreamCollection * collection);
512
513 /* FIXME: Really make all the parser stuff a self-contained helper object */
514 #include "gstdecodebin3-parse.c"
515
516 static gboolean
517 _gst_int_accumulator (GSignalInvocationHint * ihint,
518     GValue * return_accu, const GValue * handler_return, gpointer dummy)
519 {
520   gint res = g_value_get_int (handler_return);
521
522   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
523     g_value_set_int (return_accu, res);
524
525   if (res == -1)
526     return TRUE;
527
528   return FALSE;
529 }
530
531 static void
532 gst_decodebin3_class_init (GstDecodebin3Class * klass)
533 {
534   GObjectClass *gobject_klass = (GObjectClass *) klass;
535   GstElementClass *element_class = (GstElementClass *) klass;
536   GstBinClass *bin_klass = (GstBinClass *) klass;
537
538   gobject_klass->dispose = gst_decodebin3_dispose;
539   gobject_klass->set_property = gst_decodebin3_set_property;
540   gobject_klass->get_property = gst_decodebin3_get_property;
541
542   /* FIXME : ADD PROPERTIES ! */
543   g_object_class_install_property (gobject_klass, PROP_CAPS,
544       g_param_spec_boxed ("caps", "Caps",
545           "The caps on which to stop decoding. (NULL = default)",
546           GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
547
548   /* FIXME : ADD SIGNALS ! */
549   /**
550    * GstDecodebin3::select-stream
551    * @decodebin: a #GstDecodebin3
552    * @collection: a #GstStreamCollection
553    * @stream: a #GstStream
554    *
555    * This signal is emitted whenever @decodebin needs to decide whether
556    * to expose a @stream of a given @collection.
557    *
558    * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
559    * A value of -1 (default) lets @decodebin decide what to do with the stream.
560    * */
561   gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
562       g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
563       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
564       _gst_int_accumulator, NULL, g_cclosure_marshal_generic,
565       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
566
567
568   element_class->request_new_pad =
569       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
570   element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
571   element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
572
573   gst_element_class_add_pad_template (element_class,
574       gst_static_pad_template_get (&sink_template));
575   gst_element_class_add_pad_template (element_class,
576       gst_static_pad_template_get (&request_sink_template));
577   gst_element_class_add_pad_template (element_class,
578       gst_static_pad_template_get (&video_src_template));
579   gst_element_class_add_pad_template (element_class,
580       gst_static_pad_template_get (&audio_src_template));
581   gst_element_class_add_pad_template (element_class,
582       gst_static_pad_template_get (&text_src_template));
583   gst_element_class_add_pad_template (element_class,
584       gst_static_pad_template_get (&src_template));
585
586   gst_element_class_set_static_metadata (element_class,
587       "Decoder Bin 3", "Generic/Bin/Decoder",
588       "Autoplug and decode to raw media",
589       "Edward Hervey <edward@centricular.com>");
590
591   bin_klass->handle_message = gst_decodebin3_handle_message;
592
593   klass->select_stream = gst_decodebin3_select_stream;
594 }
595
596 static void
597 gst_decodebin3_init (GstDecodebin3 * dbin)
598 {
599   /* Create main input */
600   dbin->main_input = create_new_input (dbin, TRUE);
601
602   dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
603   g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
604       "max-size-buffers", 0, "use-interleave", TRUE, NULL);
605   gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
606
607   dbin->current_group_id = G_MAXUINT32;
608
609   g_mutex_init (&dbin->factories_lock);
610   g_mutex_init (&dbin->selection_lock);
611   g_mutex_init (&dbin->input_lock);
612
613   dbin->caps = gst_static_caps_get (&default_raw_caps);
614
615   GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
616 }
617
618 static void
619 gst_decodebin3_dispose (GObject * object)
620 {
621   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
622   GList *walk, *next;
623
624   if (dbin->factories)
625     gst_plugin_feature_list_free (dbin->factories);
626   if (dbin->decoder_factories)
627     g_list_free (dbin->decoder_factories);
628   if (dbin->decodable_factories)
629     g_list_free (dbin->decodable_factories);
630   g_list_free (dbin->requested_selection);
631   g_list_free (dbin->active_selection);
632   g_list_free (dbin->to_activate);
633   g_list_free (dbin->pending_select_streams);
634   g_clear_object (&dbin->collection);
635
636   free_input (dbin, dbin->main_input);
637
638   for (walk = dbin->other_inputs; walk; walk = next) {
639     DecodebinInput *input = walk->data;
640
641     next = g_list_next (walk);
642
643     free_input (dbin, input);
644     dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
645   }
646
647   G_OBJECT_CLASS (parent_class)->dispose (object);
648 }
649
650 static void
651 gst_decodebin3_set_property (GObject * object, guint prop_id,
652     const GValue * value, GParamSpec * pspec)
653 {
654   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
655
656   /* FIXME : IMPLEMENT */
657   switch (prop_id) {
658     case PROP_CAPS:
659       GST_OBJECT_LOCK (dbin);
660       if (dbin->caps)
661         gst_caps_unref (dbin->caps);
662       dbin->caps = g_value_dup_boxed (value);
663       GST_OBJECT_UNLOCK (dbin);
664       break;
665     default:
666       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
667       break;
668   }
669 }
670
671 static void
672 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
673     GParamSpec * pspec)
674 {
675   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
676
677   /* FIXME : IMPLEMENT */
678   switch (prop_id) {
679     case PROP_CAPS:
680       GST_OBJECT_LOCK (dbin);
681       g_value_set_boxed (value, dbin->caps);
682       GST_OBJECT_UNLOCK (dbin);
683       break;
684     default:
685       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
686       break;
687   }
688 }
689
690 static gboolean
691 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
692     GstCaps * caps, GstDecodebin3 * dbin)
693 {
694   GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
695
696   /* If it matches our target caps, expose it */
697   if (gst_caps_can_intersect (caps, dbin->caps))
698     return FALSE;
699
700   return TRUE;
701 }
702
703 /* This method should be called whenever a STREAM_START event
704  * comes out of a given parsebin.
705  * The caller shall replace the group_id if the function returns TRUE */
706 static gboolean
707 set_input_group_id (DecodebinInput * input, guint32 * group_id)
708 {
709   GstDecodebin3 *dbin = input->dbin;
710
711   if (input->group_id != *group_id) {
712     if (input->group_id != G_MAXUINT32)
713       GST_WARNING_OBJECT (dbin,
714           "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
715           ") on input %p ", input->group_id, *group_id, input);
716     input->group_id = *group_id;
717   }
718
719   if (*group_id != dbin->current_group_id) {
720     if (dbin->current_group_id == G_MAXUINT32) {
721       GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
722           *group_id);
723       dbin->current_group_id = *group_id;
724     }
725     *group_id = dbin->current_group_id;
726     return TRUE;
727   }
728
729   return FALSE;
730 }
731
732 /* Call with INPUT_LOCK taken */
733 static gboolean
734 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
735 {
736   gboolean set_state = FALSE;
737
738   if (input->parsebin == NULL) {
739     input->parsebin = gst_element_factory_make ("parsebin", NULL);
740     if (input->parsebin == NULL)
741       goto no_parsebin;
742     input->parsebin = gst_object_ref (input->parsebin);
743     input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
744     input->pad_added_sigid =
745         g_signal_connect (input->parsebin, "pad-added",
746         (GCallback) parsebin_pad_added_cb, input);
747     input->pad_removed_sigid =
748         g_signal_connect (input->parsebin, "pad-removed",
749         (GCallback) parsebin_pad_removed_cb, input);
750     g_signal_connect (input->parsebin, "autoplug-continue",
751         (GCallback) parsebin_autoplug_continue_cb, dbin);
752   }
753
754   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
755     gst_bin_add (GST_BIN (dbin), input->parsebin);
756     set_state = TRUE;
757   }
758
759   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
760       input->parsebin_sink);
761   if (set_state)
762     gst_element_sync_state_with_parent (input->parsebin);
763
764   return TRUE;
765
766   /* ERRORS */
767 no_parsebin:
768   {
769     gst_element_post_message ((GstElement *) dbin,
770         gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
771     return FALSE;
772   }
773 }
774
775 static GstPadLinkReturn
776 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
777 {
778   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
779   GstPadLinkReturn res = GST_PAD_LINK_OK;
780   DecodebinInput *input;
781
782   GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
783       ". Creating parsebin if needed", pad);
784
785   if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
786     goto fail;
787
788   INPUT_LOCK (dbin);
789   if (!ensure_input_parsebin (dbin, input))
790     res = GST_PAD_LINK_REFUSED;
791   INPUT_UNLOCK (dbin);
792
793   return res;
794 fail:
795   GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
796   return GST_PAD_LINK_REFUSED;
797 }
798
799 static void
800 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
801 {
802   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
803   DecodebinInput *input;
804
805   GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
806       ". Removing parsebin.", pad);
807
808   if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
809     goto fail;
810
811   INPUT_LOCK (dbin);
812   if (input->parsebin == NULL) {
813     INPUT_UNLOCK (dbin);
814     return;
815   }
816
817   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
818     GstStreamCollection *collection = NULL;
819
820     /* Clear stream-collection corresponding to current INPUT and post new
821      * stream-collection message, if needed */
822     if (input->collection) {
823       gst_object_unref (input->collection);
824       input->collection = NULL;
825     }
826
827     collection = get_merged_collection (dbin);
828     if (collection && collection != dbin->collection) {
829       GstMessage *msg;
830       GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
831
832       if (dbin->collection)
833         gst_object_unref (dbin->collection);
834       dbin->collection = collection;
835
836       msg =
837           gst_message_new_stream_collection ((GstObject *) dbin,
838           dbin->collection);
839
840       gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
841       update_requested_selection (dbin, dbin->collection);
842     }
843
844     gst_bin_remove (GST_BIN (dbin), input->parsebin);
845     gst_element_set_state (input->parsebin, GST_STATE_NULL);
846     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
847     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
848     gst_object_unref (input->parsebin);
849     gst_object_unref (input->parsebin_sink);
850
851     input->parsebin = NULL;
852     input->parsebin_sink = NULL;
853
854     if (!input->is_main) {
855       dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
856       free_input_async (dbin, input);
857     }
858   }
859   INPUT_UNLOCK (dbin);
860   return;
861
862 fail:
863   GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
864   return;
865 }
866
867 static void
868 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
869 {
870   GST_DEBUG ("Freeing input %p", input);
871   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
872   gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
873   if (input->parsebin) {
874     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
875     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
876     gst_element_set_state (input->parsebin, GST_STATE_NULL);
877     gst_object_unref (input->parsebin);
878     gst_object_unref (input->parsebin_sink);
879   }
880   if (input->collection)
881     gst_object_unref (input->collection);
882   g_free (input);
883 }
884
885 static void
886 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
887 {
888   GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
889   gst_element_call_async (GST_ELEMENT_CAST (dbin),
890       (GstElementCallAsyncFunc) free_input, input, NULL);
891 }
892
893 /* Call with INPUT_LOCK taken */
894 static DecodebinInput *
895 create_new_input (GstDecodebin3 * dbin, gboolean main)
896 {
897   DecodebinInput *input;
898
899   input = g_new0 (DecodebinInput, 1);
900   input->dbin = dbin;
901   input->is_main = main;
902   input->group_id = G_MAXUINT32;
903   if (main)
904     input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
905   else {
906     gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
907     input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
908     g_free (pad_name);
909   }
910   g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
911   gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
912   gst_pad_set_unlink_function (input->ghost_sink,
913       gst_decodebin3_input_pad_unlink);
914
915   gst_pad_set_active (input->ghost_sink, TRUE);
916   gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
917
918   return input;
919
920 }
921
922 static GstPad *
923 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
924     const gchar * name, const GstCaps * caps)
925 {
926   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
927   DecodebinInput *input;
928   GstPad *res = NULL;
929
930   /* We are ignoring names for the time being, not sure it makes any sense
931    * within the context of decodebin3 ... */
932   INPUT_LOCK (dbin);
933   input = create_new_input (dbin, FALSE);
934   if (input) {
935     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
936     res = input->ghost_sink;
937   }
938   INPUT_UNLOCK (dbin);
939
940   return res;
941 }
942
943 /* Must be called with factories lock! */
944 static void
945 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
946 {
947   guint cookie;
948
949   cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
950   if (!dbin->factories || dbin->factories_cookie != cookie) {
951     GList *tmp;
952     if (dbin->factories)
953       gst_plugin_feature_list_free (dbin->factories);
954     if (dbin->decoder_factories)
955       g_list_free (dbin->decoder_factories);
956     if (dbin->decodable_factories)
957       g_list_free (dbin->decodable_factories);
958     dbin->factories =
959         gst_element_factory_list_get_elements
960         (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
961     dbin->factories =
962         g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
963     dbin->factories_cookie = cookie;
964
965     /* Filter decoder and other decodables */
966     dbin->decoder_factories = NULL;
967     dbin->decodable_factories = NULL;
968     for (tmp = dbin->factories; tmp; tmp = tmp->next) {
969       GstElementFactory *fact = (GstElementFactory *) tmp->data;
970       if (gst_element_factory_list_is_type (fact,
971               GST_ELEMENT_FACTORY_TYPE_DECODER))
972         dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
973       else
974         dbin->decodable_factories =
975             g_list_append (dbin->decodable_factories, fact);
976     }
977   }
978 }
979
980 /* Must be called with appropriate lock if list is a protected variable */
981 static gboolean
982 stream_in_list (GList * list, const gchar * sid)
983 {
984   GList *tmp;
985
986 #if EXTRA_DEBUG
987   for (tmp = list; tmp; tmp = tmp->next) {
988     gchar *osid = (gchar *) tmp->data;
989     GST_DEBUG ("Checking %s against %s", sid, osid);
990   }
991 #endif
992
993   for (tmp = list; tmp; tmp = tmp->next) {
994     gchar *osid = (gchar *) tmp->data;
995     if (!g_strcmp0 (sid, osid))
996       return TRUE;
997   }
998
999   return FALSE;
1000 }
1001
1002 static void
1003 update_requested_selection (GstDecodebin3 * dbin,
1004     GstStreamCollection * collection)
1005 {
1006   guint i, nb;
1007   GList *tmp = NULL;
1008   GstStreamType used_types = 0;
1009
1010   nb = gst_stream_collection_get_size (collection);
1011
1012   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1013    *  the switch handler will take care of the pending selection */
1014   SELECTION_LOCK (dbin);
1015   if (dbin->pending_select_streams) {
1016     GST_DEBUG_OBJECT (dbin,
1017         "No need to create pending selection, SELECT_STREAMS underway");
1018     goto beach;
1019   }
1020
1021   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1022   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1023
1024   /* 3. If not, check if we already have some of the streams in the
1025    * existing active/requested selection */
1026   for (i = 0; i < nb; i++) {
1027     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1028     const gchar *sid = gst_stream_get_stream_id (stream);
1029     gint request = -1;
1030     /* Fire select-stream signal to see if outside components want to
1031      * hint at which streams should be selected */
1032     g_signal_emit (G_OBJECT (dbin),
1033         gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1034         &request);
1035     GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1036     if (request == 1 || (request == -1
1037             && (stream_in_list (dbin->requested_selection, sid)
1038                 || stream_in_list (dbin->active_selection, sid)))) {
1039       GstStreamType curtype = gst_stream_get_stream_type (stream);
1040       if (request == 1)
1041         GST_DEBUG_OBJECT (dbin,
1042             "Using stream requested by 'select-stream' signal : %s", sid);
1043       else
1044         GST_DEBUG_OBJECT (dbin,
1045             "Re-using stream already present in requested or active selection : %s",
1046             sid);
1047       tmp = g_list_append (tmp, (gchar *) sid);
1048       used_types |= curtype;
1049     }
1050   }
1051
1052   /* 4. If not, match one stream of each type */
1053   for (i = 0; i < nb; i++) {
1054     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1055     GstStreamType curtype = gst_stream_get_stream_type (stream);
1056     if (!(used_types & curtype)) {
1057       const gchar *sid = gst_stream_get_stream_id (stream);
1058       GST_DEBUG_OBJECT (dbin, "Selecting stream '%s' of type %s",
1059           sid, gst_stream_type_get_name (curtype));
1060       tmp = g_list_append (tmp, (gchar *) sid);
1061       used_types |= curtype;
1062     }
1063   }
1064
1065 beach:
1066   /* Finally set the requested selection */
1067   if (tmp) {
1068     if (dbin->requested_selection) {
1069       GST_FIXME_OBJECT (dbin,
1070           "Replacing non-NULL requested_selection, what should we do ??");
1071       g_list_free (dbin->requested_selection);
1072     }
1073     dbin->requested_selection = tmp;
1074     dbin->selection_updated = TRUE;
1075   }
1076   SELECTION_UNLOCK (dbin);
1077 }
1078
1079 /* Call with INPUT_LOCK taken */
1080 static GstStreamCollection *
1081 get_merged_collection (GstDecodebin3 * dbin)
1082 {
1083   gboolean needs_merge = FALSE;
1084   GstStreamCollection *res = NULL;
1085   GList *tmp;
1086   guint i, nb_stream;
1087
1088   /* First check if we need to do a merge or just return the only collection */
1089   res = dbin->main_input->collection;
1090
1091   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1092     DecodebinInput *input = (DecodebinInput *) tmp->data;
1093     if (input->collection) {
1094       if (res) {
1095         needs_merge = TRUE;
1096         break;
1097       }
1098       res = input->collection;
1099     }
1100   }
1101
1102   if (!needs_merge) {
1103     GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1104     return res ? gst_object_ref (res) : NULL;
1105   }
1106
1107   /* We really need to create a new collection */
1108   /* FIXME : Some numbering scheme maybe ?? */
1109   res = gst_stream_collection_new ("decodebin3");
1110   if (dbin->main_input->collection) {
1111     nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1112     GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1113     for (i = 0; i < nb_stream; i++) {
1114       GstStream *stream =
1115           gst_stream_collection_get_stream (dbin->main_input->collection, i);
1116       gst_stream_collection_add_stream (res, gst_object_ref (stream));
1117     }
1118   }
1119
1120   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1121     DecodebinInput *input = (DecodebinInput *) tmp->data;
1122     GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1123         input->collection);
1124     if (input->collection) {
1125       nb_stream = gst_stream_collection_get_size (input->collection);
1126       GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1127       for (i = 0; i < nb_stream; i++) {
1128         GstStream *stream =
1129             gst_stream_collection_get_stream (input->collection, i);
1130         gst_stream_collection_add_stream (res, gst_object_ref (stream));
1131       }
1132     }
1133   }
1134
1135   return res;
1136 }
1137
1138 /* Call with INPUT_LOCK taken */
1139 static DecodebinInput *
1140 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1141 {
1142   DecodebinInput *input = NULL;
1143   GstElement *parent = gst_object_ref (child);
1144   GList *tmp;
1145
1146   do {
1147     GstElement *next_parent;
1148
1149     GST_DEBUG_OBJECT (dbin, "parent %s",
1150         parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1151
1152     if (parent == dbin->main_input->parsebin) {
1153       input = dbin->main_input;
1154       break;
1155     }
1156     for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1157       DecodebinInput *cur = (DecodebinInput *) tmp->data;
1158       if (parent == cur->parsebin) {
1159         input = cur;
1160         break;
1161       }
1162     }
1163     next_parent = (GstElement *) gst_element_get_parent (parent);
1164     gst_object_unref (parent);
1165     parent = next_parent;
1166
1167   } while (parent && parent != (GstElement *) dbin);
1168
1169   if (parent)
1170     gst_object_unref (parent);
1171
1172   return input;
1173 }
1174
1175 static gboolean
1176 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1177 {
1178   guint i, len;
1179
1180   if (dbin->collection == NULL)
1181     return FALSE;
1182   len = gst_stream_collection_get_size (dbin->collection);
1183   for (i = 0; i < len; i++) {
1184     GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1185     const gchar *osid = gst_stream_get_stream_id (stream);
1186     if (!g_strcmp0 (sid, osid))
1187       return TRUE;
1188   }
1189
1190   return FALSE;
1191 }
1192
1193 /* Call with INPUT_LOCK taken */
1194 static void
1195 handle_stream_collection (GstDecodebin3 * dbin,
1196     GstStreamCollection * collection, GstElement * child)
1197 {
1198 #ifndef GST_DISABLE_GST_DEBUG
1199   const gchar *upstream_id;
1200   guint i;
1201 #endif
1202   DecodebinInput *input = find_message_parsebin (dbin, child);
1203
1204   if (!input) {
1205     GST_DEBUG_OBJECT (dbin,
1206         "Couldn't find corresponding input, most likely shutting down");
1207     return;
1208   }
1209
1210   /* Replace collection in input */
1211   if (input->collection)
1212     gst_object_unref (input->collection);
1213   input->collection = gst_object_ref (collection);
1214   GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1215       input);
1216
1217   /* Merge collection if needed */
1218   collection = get_merged_collection (dbin);
1219
1220 #ifndef GST_DISABLE_GST_DEBUG
1221   /* Just some debugging */
1222   upstream_id = gst_stream_collection_get_upstream_id (collection);
1223   GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1224   GST_DEBUG ("From input %p", input);
1225   GST_DEBUG ("  %d streams", gst_stream_collection_get_size (collection));
1226   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1227     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1228     GstTagList *taglist;
1229     GstCaps *caps;
1230
1231     GST_DEBUG ("   Stream '%s'", gst_stream_get_stream_id (stream));
1232     GST_DEBUG ("     type  : %s",
1233         gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1234     GST_DEBUG ("     flags : 0x%x", gst_stream_get_stream_flags (stream));
1235     taglist = gst_stream_get_tags (stream);
1236     GST_DEBUG ("     tags  : %" GST_PTR_FORMAT, taglist);
1237     caps = gst_stream_get_caps (stream);
1238     GST_DEBUG ("     caps  : %" GST_PTR_FORMAT, caps);
1239     if (taglist)
1240       gst_tag_list_unref (taglist);
1241     if (caps)
1242       gst_caps_unref (caps);
1243   }
1244 #endif
1245
1246   /* Store collection for later usage */
1247   if (dbin->collection == NULL) {
1248     dbin->collection = collection;
1249   } else {
1250     /* We need to check who emitted this collection (the owner).
1251      * If we already had a collection from that user, this one is an update,
1252      * that is to say that we need to figure out how we are going to re-use
1253      * the streams/slot */
1254     GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1255     /* FIXME : When do we switch from pending collection to active collection ?
1256      * When all streams from active collection are drained in multiqueue output ? */
1257     gst_object_unref (dbin->collection);
1258     dbin->collection = collection;
1259     /* dbin->pending_collection = */
1260     /*     g_list_append (dbin->pending_collection, collection); */
1261   }
1262 }
1263
1264 static void
1265 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1266 {
1267   GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1268   gboolean posting_collection = FALSE;
1269
1270   GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1271
1272   switch (GST_MESSAGE_TYPE (message)) {
1273     case GST_MESSAGE_STREAM_COLLECTION:
1274     {
1275       GstStreamCollection *collection = NULL;
1276       gst_message_parse_stream_collection (message, &collection);
1277       if (collection) {
1278         INPUT_LOCK (dbin);
1279         handle_stream_collection (dbin, collection,
1280             (GstElement *) GST_MESSAGE_SRC (message));
1281         posting_collection = TRUE;
1282         INPUT_UNLOCK (dbin);
1283       }
1284       if (dbin->collection && collection != dbin->collection) {
1285         /* Replace collection message, we most likely aggregated it */
1286         GstMessage *new_msg;
1287         new_msg =
1288             gst_message_new_stream_collection ((GstObject *) dbin,
1289             dbin->collection);
1290         gst_message_unref (message);
1291         message = new_msg;
1292       }
1293       if (collection)
1294         gst_object_unref (collection);
1295       break;
1296     }
1297     default:
1298       break;
1299   }
1300
1301   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1302
1303   if (posting_collection) {
1304     /* Figure out a selection for that collection */
1305     update_requested_selection (dbin, dbin->collection);
1306   }
1307 }
1308
1309 static DecodebinOutputStream *
1310 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1311 {
1312   GList *tmp;
1313   GstStreamType stype = gst_stream_get_stream_type (stream);
1314
1315   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1316     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1317     if (output->type == stype && output->slot && output->slot->active_stream) {
1318       GstStream *tstream = output->slot->active_stream;
1319       if (!stream_in_list (dbin->requested_selection,
1320               (gchar *) gst_stream_get_stream_id (tstream))) {
1321         return output;
1322       }
1323     }
1324   }
1325
1326   return NULL;
1327 }
1328
1329 /* Give a certain slot, figure out if it should be linked to an
1330  * output stream
1331  * CALL WITH SELECTION LOCK TAKEN !*/
1332 static DecodebinOutputStream *
1333 get_output_for_slot (MultiQueueSlot * slot)
1334 {
1335   GstDecodebin3 *dbin = slot->dbin;
1336   DecodebinOutputStream *output = NULL;
1337   const gchar *stream_id;
1338   GstCaps *caps;
1339
1340   /* If we already have a configured output, just use it */
1341   if (slot->output != NULL)
1342     return slot->output;
1343
1344   /*
1345    * FIXME
1346    *
1347    * This method needs to be split into multiple parts
1348    *
1349    * 1) Figure out whether stream should be exposed or not
1350    *   This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1351    *   in the default stream attribution
1352    *
1353    * 2) Figure out whether an output stream should be created, whether
1354    *   we can re-use the output stream already linked to the slot, or
1355    *   whether we need to get re-assigned another (currently used) output
1356    *   stream.
1357    */
1358
1359   stream_id = gst_stream_get_stream_id (slot->active_stream);
1360   caps = gst_stream_get_caps (slot->active_stream);
1361   GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1362   gst_caps_unref (caps);
1363
1364   /* 0. Emit autoplug-continue signal for pending caps ? */
1365   GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1366
1367   /* 1. if in EXPOSE_ALL_MODE, just accept */
1368   GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1369
1370 #if 0
1371   /* FIXME : The idea around this was to avoid activating a stream for
1372    *     which we have no decoder. Unfortunately it is way too
1373    *     expensive. Need to figure out a better solution */
1374   /* 2. Is there a potential decoder (if one is required) */
1375   if (!gst_caps_can_intersect (caps, dbin->caps)
1376       && !have_factory (dbin, (GstCaps *) caps,
1377           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1378     GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1379         caps);
1380     SELECTION_UNLOCK (dbin);
1381     gst_element_post_message (GST_ELEMENT_CAST (dbin),
1382         gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1383     SELECTION_LOCK (dbin);
1384     return NULL;
1385   }
1386 #endif
1387
1388   /* 3. In default mode check if we should expose */
1389   if (stream_in_list (dbin->requested_selection, stream_id)) {
1390     /* Check if we can steal an existing output stream we could re-use.
1391      * that is:
1392      * * an output stream whose slot->stream is not in requested
1393      * * and is of the same type as this stream
1394      */
1395     output = find_free_compatible_output (dbin, slot->active_stream);
1396     if (output) {
1397       /* Move this output from its current slot to this slot */
1398       dbin->to_activate =
1399           g_list_append (dbin->to_activate, (gchar *) stream_id);
1400       dbin->requested_selection =
1401           g_list_remove (dbin->requested_selection, stream_id);
1402       SELECTION_UNLOCK (dbin);
1403       gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1404           (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1405       SELECTION_LOCK (dbin);
1406       return NULL;
1407     }
1408
1409     output = create_output_stream (dbin, slot->type);
1410     output->slot = slot;
1411     GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1412     slot->output = output;
1413     dbin->active_selection =
1414         g_list_append (dbin->active_selection, (gchar *) stream_id);
1415   } else
1416     GST_DEBUG ("Not creating any output for slot %p", slot);
1417
1418   return output;
1419 }
1420
1421 /* Returns SELECTED_STREAMS message if active_selection is equal to
1422  * requested_selection, else NULL.
1423  * Must be called with LOCK taken */
1424 static GstMessage *
1425 is_selection_done (GstDecodebin3 * dbin)
1426 {
1427   GList *tmp;
1428   GstMessage *msg;
1429
1430   if (!dbin->selection_updated)
1431     return NULL;
1432
1433   GST_LOG_OBJECT (dbin, "Checking");
1434
1435   if (dbin->to_activate != NULL) {
1436     GST_DEBUG ("Still have streams to activate");
1437     return NULL;
1438   }
1439   for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1440     GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1441     if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1442       GST_DEBUG ("Not in active selection, returning");
1443       return NULL;
1444     }
1445   }
1446
1447   GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1448
1449   /* We are completely active */
1450   msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1451   GST_MESSAGE_SEQNUM (msg) = dbin->select_streams_seqnum;
1452   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1453     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1454     if (output->slot) {
1455       GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1456           gst_stream_get_stream_id (output->slot->active_stream));
1457
1458       gst_message_streams_selected_add (msg, output->slot->active_stream);
1459     } else
1460       GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1461   }
1462   dbin->selection_updated = FALSE;
1463   return msg;
1464 }
1465
1466 static GstPadProbeReturn
1467 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1468     MultiQueueSlot * slot)
1469 {
1470   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1471   GstDecodebin3 *dbin = slot->dbin;
1472
1473   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1474     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1475
1476     GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1477     switch (GST_EVENT_TYPE (ev)) {
1478       case GST_EVENT_STREAM_START:
1479       {
1480         GstStream *stream = NULL;
1481         const gchar *stream_id;
1482
1483         gst_event_parse_stream (ev, &stream);
1484         if (stream == NULL) {
1485           GST_ERROR_OBJECT (pad,
1486               "Got a STREAM_START event without a GstStream");
1487           break;
1488         }
1489         slot->is_drained = FALSE;
1490         stream_id = gst_stream_get_stream_id (stream);
1491         GST_DEBUG_OBJECT (pad, "Stream Start '%s'", stream_id);
1492         if (slot->active_stream == NULL) {
1493           slot->active_stream = stream;
1494         } else if (slot->active_stream != stream) {
1495           GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1496               gst_stream_get_stream_id (slot->active_stream),
1497               gst_stream_get_stream_id (stream));
1498           gst_object_unref (slot->active_stream);
1499           slot->active_stream = stream;
1500         } else
1501           gst_object_unref (stream);
1502 #if 0                           /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1503         {
1504           gboolean is_active, is_requested;
1505           /* Quick check to see if we're in the current selection */
1506           /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
1507           SELECTION_LOCK (dbin);
1508           GST_DEBUG_OBJECT (dbin, "Checking active selection");
1509           is_active = stream_in_list (dbin->active_selection, stream_id);
1510           GST_DEBUG_OBJECT (dbin, "Checking requested selection");
1511           is_requested = stream_in_list (dbin->requested_selection, stream_id);
1512           SELECTION_UNLOCK (dbin);
1513           if (is_active)
1514             GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
1515                 slot->output);
1516           if (is_requested)
1517             GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
1518                 slot->output);
1519           else if (slot->output) {
1520             GST_DEBUG_OBJECT (pad,
1521                 "Slot needs to be deactivated ? It's no longer in requested selection");
1522           } else if (!is_active)
1523             GST_DEBUG_OBJECT (pad,
1524                 "Slot in neither active nor requested selection");
1525         }
1526 #endif
1527       }
1528         break;
1529       case GST_EVENT_CAPS:
1530       {
1531         /* Configure the output slot if needed */
1532         DecodebinOutputStream *output;
1533         GstMessage *msg = NULL;
1534         SELECTION_LOCK (dbin);
1535         output = get_output_for_slot (slot);
1536         if (output) {
1537           reconfigure_output_stream (output, slot);
1538           msg = is_selection_done (dbin);
1539         }
1540         SELECTION_UNLOCK (dbin);
1541         if (msg)
1542           gst_element_post_message ((GstElement *) slot->dbin, msg);
1543       }
1544         break;
1545       case GST_EVENT_EOS:
1546         /* FIXME : Figure out */
1547         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
1548             slot->input);
1549         slot->is_drained = TRUE;
1550         if (slot->input == NULL) {
1551           GstPad *peer;
1552           GST_DEBUG_OBJECT (pad,
1553               "last EOS for input, forwarding and removing slot");
1554           peer = gst_pad_get_peer (pad);
1555           if (peer) {
1556             gst_pad_send_event (peer, ev);
1557             gst_object_unref (peer);
1558           } else {
1559             gst_event_unref (ev);
1560           }
1561           SELECTION_LOCK (dbin);
1562           /* FIXME : Shouldn't we try to re-assign the output instead of just
1563            * removing it ? */
1564           /* Remove the output */
1565           if (slot->output) {
1566             DecodebinOutputStream *output = slot->output;
1567             dbin->output_streams = g_list_remove (dbin->output_streams, output);
1568             free_output_stream (dbin, output);
1569           }
1570           dbin->slots = g_list_remove (dbin->slots, slot);
1571           free_multiqueue_slot_async (dbin, slot);
1572           SELECTION_UNLOCK (dbin);
1573           ret = GST_PAD_PROBE_HANDLED;
1574         }
1575         break;
1576       case GST_EVENT_CUSTOM_DOWNSTREAM:
1577         if (gst_event_has_name (ev, "decodebin3-custom-eos")) {
1578           slot->is_drained = TRUE;
1579           SELECTION_LOCK (dbin);
1580           if (slot->input == NULL) {
1581             GST_DEBUG_OBJECT (pad,
1582                 "Got custom-eos from null input stream, remove output stream");
1583             /* Remove the output */
1584             if (slot->output) {
1585               DecodebinOutputStream *output = slot->output;
1586               dbin->output_streams =
1587                   g_list_remove (dbin->output_streams, output);
1588               free_output_stream (dbin, output);
1589             }
1590             dbin->slots = g_list_remove (dbin->slots, slot);
1591             free_multiqueue_slot_async (dbin, slot);
1592           }
1593           SELECTION_UNLOCK (dbin);
1594           ret = GST_PAD_PROBE_DROP;
1595         }
1596         break;
1597       default:
1598         break;
1599     }
1600   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
1601     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
1602     switch (GST_QUERY_TYPE (query)) {
1603       case GST_QUERY_CAPS:
1604       {
1605         GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
1606         gst_query_set_caps_result (query, GST_CAPS_ANY);
1607         ret = GST_PAD_PROBE_HANDLED;
1608       }
1609         break;
1610
1611       case GST_QUERY_ACCEPT_CAPS:
1612       {
1613         GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
1614         /* If the current decoder doesn't accept caps, we'll reconfigure
1615          * on the actual caps event. So accept any caps. */
1616         gst_query_set_accept_caps_result (query, TRUE);
1617         ret = GST_PAD_PROBE_HANDLED;
1618       }
1619       default:
1620         break;
1621     }
1622   }
1623
1624   return ret;
1625 }
1626
1627 /* Create a new multiqueue slot for the given type
1628  *
1629  * It is up to the caller to know whether that slot is needed or not
1630  * (and release it when no longer needed) */
1631 static MultiQueueSlot *
1632 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
1633 {
1634   MultiQueueSlot *slot;
1635   GstIterator *it = NULL;
1636   GValue item = { 0, };
1637
1638   GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
1639       gst_stream_type_get_name (type));
1640   slot = g_new0 (MultiQueueSlot, 1);
1641   slot->dbin = dbin;
1642   slot->id = dbin->slot_id++;
1643   slot->type = type;
1644   slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
1645   if (slot->sink_pad == NULL)
1646     goto fail;
1647   it = gst_pad_iterate_internal_links (slot->sink_pad);
1648   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
1649       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
1650     GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
1651         GST_DEBUG_PAD_NAME (slot->src_pad));
1652     goto fail;
1653   }
1654   gst_iterator_free (it);
1655   g_value_reset (&item);
1656
1657   g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
1658
1659   /* Add event probe */
1660   slot->probe_id =
1661       gst_pad_add_probe (slot->src_pad,
1662       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
1663       (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
1664
1665   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
1666       GST_DEBUG_PAD_NAME (slot->src_pad));
1667   dbin->slots = g_list_append (dbin->slots, slot);
1668   return slot;
1669
1670   /* ERRORS */
1671 fail:
1672   {
1673     if (slot->sink_pad)
1674       gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
1675     g_free (slot);
1676     return NULL;
1677   }
1678 }
1679
1680 /* Must be called with SELECTION_LOCK */
1681 static MultiQueueSlot *
1682 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
1683 {
1684   GList *tmp;
1685   MultiQueueSlot *empty_slot = NULL;
1686   GstStreamType input_type = 0;
1687   gchar *stream_id = NULL;
1688
1689   GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
1690       input, input->active_stream,
1691       input->
1692       active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
1693
1694   if (input->active_stream) {
1695     input_type = gst_stream_get_stream_type (input->active_stream);
1696     stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
1697   }
1698
1699   /* Go over existing slots and check if there is already one for it */
1700   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1701     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1702     /* Already used input, return that one */
1703     if (slot->input == input) {
1704       GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
1705       return slot;
1706     }
1707   }
1708
1709   /* Go amongst all unused slots of the right type and try to find a candidate */
1710   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1711     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1712     if (slot->input == NULL && input_type == slot->type) {
1713       /* Remember this empty slot for later */
1714       empty_slot = slot;
1715       /* Check if available slot is of the same stream_id */
1716       GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
1717           slot->id, slot->active_stream);
1718       if (stream_id && slot->active_stream) {
1719         gchar *ostream_id =
1720             (gchar *) gst_stream_get_stream_id (slot->active_stream);
1721         GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
1722             ostream_id, stream_id);
1723         if (!g_strcmp0 (stream_id, ostream_id))
1724           break;
1725       }
1726     }
1727   }
1728
1729   if (empty_slot) {
1730     GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
1731     empty_slot->input = input;
1732     return empty_slot;
1733   }
1734
1735   if (input_type)
1736     return create_new_slot (dbin, input_type);
1737
1738   return NULL;
1739 }
1740
1741 static void
1742 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
1743 {
1744   GstEvent *event;
1745   if (slot->input != NULL && slot->input != input) {
1746     GST_ERROR_OBJECT (slot->dbin,
1747         "Trying to link input to an already used slot");
1748     return;
1749   }
1750   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
1751   slot->pending_stream = input->active_stream;
1752   slot->input = input;
1753   event = gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1754   if (event)
1755     gst_pad_send_event (slot->sink_pad, event);
1756 }
1757
1758 #if 0
1759 static gboolean
1760 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
1761     GstElementFactoryListType ftype)
1762 {
1763   gboolean ret = FALSE;
1764   GList *res;
1765
1766   g_mutex_lock (&dbin->factories_lock);
1767   gst_decode_bin_update_factories_list (dbin);
1768   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1769     res =
1770         gst_element_factory_list_filter (dbin->decoder_factories,
1771         caps, GST_PAD_SINK, TRUE);
1772   else
1773     res =
1774         gst_element_factory_list_filter (dbin->decodable_factories,
1775         caps, GST_PAD_SINK, TRUE);
1776   g_mutex_unlock (&dbin->factories_lock);
1777
1778   if (res) {
1779     ret = TRUE;
1780     gst_plugin_feature_list_free (res);
1781   }
1782
1783   return ret;
1784 }
1785 #endif
1786
1787 static GstElement *
1788 create_element (GstDecodebin3 * dbin, GstStream * stream,
1789     GstElementFactoryListType ftype)
1790 {
1791   GList *res;
1792   GstElement *element = NULL;
1793   GstCaps *caps;
1794
1795   g_mutex_lock (&dbin->factories_lock);
1796   gst_decode_bin_update_factories_list (dbin);
1797   caps = gst_stream_get_caps (stream);
1798   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1799     res =
1800         gst_element_factory_list_filter (dbin->decoder_factories,
1801         caps, GST_PAD_SINK, TRUE);
1802   else
1803     res =
1804         gst_element_factory_list_filter (dbin->decodable_factories,
1805         caps, GST_PAD_SINK, TRUE);
1806   g_mutex_unlock (&dbin->factories_lock);
1807
1808   if (res) {
1809     element =
1810         gst_element_factory_create ((GstElementFactory *) res->data, NULL);
1811     GST_DEBUG ("Created element '%s'", GST_ELEMENT_NAME (element));
1812     gst_plugin_feature_list_free (res);
1813   } else {
1814     GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT, caps);
1815   }
1816
1817   gst_caps_unref (caps);
1818   return element;
1819 }
1820
1821 /* FIXME : VERY NAIVE. ASSUMING FIRST ONE WILL WORK */
1822 static GstElement *
1823 create_decoder (GstDecodebin3 * dbin, GstStream * stream)
1824 {
1825   return create_element (dbin, stream, GST_ELEMENT_FACTORY_TYPE_DECODER);
1826 }
1827
1828 static GstPadProbeReturn
1829 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
1830     DecodebinOutputStream * output)
1831 {
1832   GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
1833   /* If we have a keyframe, remove the probe and let all data through */
1834   /* FIXME : HANDLE HEADER BUFFER ?? */
1835   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1836       GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
1837     GST_DEBUG_OBJECT (pad,
1838         "Buffer is keyframe or header, letting through and removing probe");
1839     output->drop_probe_id = 0;
1840     return GST_PAD_PROBE_REMOVE;
1841   }
1842   GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
1843   return GST_PAD_PROBE_DROP;
1844 }
1845
1846 static void
1847 reconfigure_output_stream (DecodebinOutputStream * output,
1848     MultiQueueSlot * slot)
1849 {
1850   GstDecodebin3 *dbin = output->dbin;
1851   GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
1852   gboolean needs_decoder;
1853
1854   needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
1855
1856   GST_DEBUG_OBJECT (dbin,
1857       "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
1858       needs_decoder);
1859
1860   /* FIXME : Maybe make the output un-hook itself automatically ? */
1861   if (output->slot != NULL && output->slot != slot) {
1862     GST_WARNING_OBJECT (dbin,
1863         "Output still linked to another slot (%p)", output->slot);
1864     gst_caps_unref (new_caps);
1865     return;
1866   }
1867
1868   /* Check if existing config is reusable as-is by checking if
1869    * the existing decoder accepts the new caps, if not delete
1870    * it and create a new one */
1871   if (output->decoder) {
1872     gboolean can_reuse_decoder;
1873
1874     if (needs_decoder) {
1875       can_reuse_decoder =
1876           gst_pad_query_accept_caps (output->decoder_sink, new_caps);
1877     } else
1878       can_reuse_decoder = FALSE;
1879
1880     if (can_reuse_decoder) {
1881       if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
1882         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
1883         output->drop_probe_id =
1884             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
1885             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
1886       }
1887       GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
1888       if (output->linked == FALSE) {
1889         gst_pad_link_full (slot->src_pad, output->decoder_sink,
1890             GST_PAD_LINK_CHECK_NOTHING);
1891         output->linked = TRUE;
1892       }
1893       gst_caps_unref (new_caps);
1894       return;
1895     }
1896
1897     GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
1898
1899     if (output->linked)
1900       gst_pad_unlink (slot->src_pad, output->decoder_sink);
1901     output->linked = FALSE;
1902     if (output->drop_probe_id) {
1903       gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
1904       output->drop_probe_id = 0;
1905     }
1906
1907     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
1908       GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
1909       gst_caps_unref (new_caps);
1910       goto cleanup;
1911     }
1912
1913     gst_element_set_locked_state (output->decoder, TRUE);
1914     gst_element_set_state (output->decoder, GST_STATE_NULL);
1915
1916     gst_bin_remove ((GstBin *) dbin, output->decoder);
1917     output->decoder = NULL;
1918   }
1919
1920   gst_caps_unref (new_caps);
1921
1922   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
1923   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
1924
1925   /* If a decoder is required, create one */
1926   if (needs_decoder) {
1927     /* If we don't have a decoder yet, instantiate one */
1928     output->decoder = create_decoder (dbin, slot->active_stream);
1929     if (output->decoder == NULL) {
1930       GstCaps *caps;
1931
1932       SELECTION_UNLOCK (dbin);
1933       /* FIXME : Should we be smarter if there's a missing decoder ?
1934        * Should we deactivate that stream ? */
1935       caps = gst_stream_get_caps (slot->active_stream);
1936       gst_element_post_message (GST_ELEMENT_CAST (dbin),
1937           gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1938       gst_caps_unref (caps);
1939       SELECTION_LOCK (dbin);
1940       goto cleanup;
1941     }
1942     if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
1943       GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
1944       goto cleanup;
1945     }
1946     output->decoder_sink = gst_element_get_static_pad (output->decoder, "sink");
1947     output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
1948     if (output->type & GST_STREAM_TYPE_VIDEO) {
1949       GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
1950       output->drop_probe_id =
1951           gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
1952           (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
1953     }
1954     if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
1955             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
1956       GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
1957           GST_DEBUG_PAD_NAME (output->decoder_sink));
1958       goto cleanup;
1959     }
1960   } else {
1961     output->decoder_src = gst_object_ref (slot->src_pad);
1962     output->decoder_sink = NULL;
1963   }
1964   output->linked = TRUE;
1965   if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
1966           output->decoder_src)) {
1967     GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
1968     goto cleanup;
1969   }
1970   if (output->src_exposed == FALSE) {
1971     output->src_exposed = TRUE;
1972     gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
1973   }
1974
1975   if (output->decoder)
1976     gst_element_sync_state_with_parent (output->decoder);
1977
1978   output->slot = slot;
1979   return;
1980
1981 cleanup:
1982   {
1983     GST_DEBUG_OBJECT (dbin, "Cleanup");
1984     if (output->decoder_sink) {
1985       gst_object_unref (output->decoder_sink);
1986       output->decoder_sink = NULL;
1987     }
1988     if (output->decoder_src) {
1989       gst_object_unref (output->decoder_src);
1990       output->decoder_src = NULL;
1991     }
1992     if (output->decoder) {
1993       gst_element_set_state (output->decoder, GST_STATE_NULL);
1994       gst_bin_remove ((GstBin *) dbin, output->decoder);
1995       output->decoder = NULL;
1996     }
1997   }
1998 }
1999
2000 static GstPadProbeReturn
2001 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2002 {
2003   GstMessage *msg = NULL;
2004   DecodebinOutputStream *output;
2005
2006   SELECTION_LOCK (slot->dbin);
2007   output = get_output_for_slot (slot);
2008
2009   GST_DEBUG_OBJECT (pad, "output : %p", output);
2010
2011   if (output) {
2012     reconfigure_output_stream (output, slot);
2013     msg = is_selection_done (slot->dbin);
2014   }
2015   SELECTION_UNLOCK (slot->dbin);
2016   if (msg)
2017     gst_element_post_message ((GstElement *) slot->dbin, msg);
2018
2019   return GST_PAD_PROBE_REMOVE;
2020 }
2021
2022 static MultiQueueSlot *
2023 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2024 {
2025   GList *tmp;
2026
2027   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2028     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2029     const gchar *stream_id;
2030     if (slot->active_stream) {
2031       stream_id = gst_stream_get_stream_id (slot->active_stream);
2032       if (!g_strcmp0 (sid, stream_id))
2033         return slot;
2034     }
2035     if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2036       stream_id = gst_stream_get_stream_id (slot->pending_stream);
2037       if (!g_strcmp0 (sid, stream_id))
2038         return slot;
2039     }
2040   }
2041
2042   return NULL;
2043 }
2044
2045 /* This function handles the reassignment of a slot. Call this from
2046  * the streaming thread of a slot. */
2047 static gboolean
2048 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2049 {
2050   DecodebinOutputStream *output;
2051   MultiQueueSlot *target_slot = NULL;
2052   GList *tmp;
2053   const gchar *sid, *tsid;
2054
2055   SELECTION_LOCK (dbin);
2056   output = slot->output;
2057
2058   if (G_UNLIKELY (slot->active_stream == NULL)) {
2059     GST_DEBUG_OBJECT (slot->src_pad,
2060         "Called on inactive slot (active_stream == NULL)");
2061     SELECTION_UNLOCK (dbin);
2062     return FALSE;
2063   }
2064
2065   if (G_UNLIKELY (output == NULL)) {
2066     GST_DEBUG_OBJECT (slot->src_pad,
2067         "Slot doesn't have any output to be removed");
2068     SELECTION_UNLOCK (dbin);
2069     return FALSE;
2070   }
2071
2072   sid = gst_stream_get_stream_id (slot->active_stream);
2073   GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2074
2075   /* Recheck whether this stream is still in the list of streams to deactivate */
2076   if (stream_in_list (dbin->requested_selection, sid)) {
2077     /* Stream is in the list of requested streams, don't remove */
2078     SELECTION_UNLOCK (dbin);
2079     GST_DEBUG_OBJECT (slot->src_pad,
2080         "Stream '%s' doesn't need to be deactivated", sid);
2081     return FALSE;
2082   }
2083
2084   /* Unlink slot from output */
2085   /* FIXME : Handle flushing ? */
2086   /* FIXME : Handle outputs without decoders */
2087   GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2088       output->decoder_sink);
2089   if (output->decoder_sink)
2090     gst_pad_unlink (slot->src_pad, output->decoder_sink);
2091   output->linked = FALSE;
2092   slot->output = NULL;
2093   output->slot = NULL;
2094   /* Remove sid from active selection */
2095   for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2096     if (!g_strcmp0 (sid, tmp->data)) {
2097       dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2098       break;
2099     }
2100
2101   /* Can we re-assign this output to a requested stream ? */
2102   GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2103   for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2104     MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2105     GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2106         tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2107     if (tslot && tslot->type == output->type && tslot->output == NULL) {
2108       GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2109       target_slot = tslot;
2110       tsid = tmp->data;
2111       /* Pass target stream id to requested selection */
2112       dbin->requested_selection =
2113           g_list_append (dbin->requested_selection, tmp->data);
2114       dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data);
2115       break;
2116     }
2117   }
2118
2119   if (target_slot) {
2120     GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2121         target_slot, tsid);
2122     target_slot->output = output;
2123     output->slot = target_slot;
2124     dbin->active_selection =
2125         g_list_append (dbin->active_selection, (gchar *) tsid);
2126     SELECTION_UNLOCK (dbin);
2127
2128     /* Wakeup the target slot so that it retries to send events/buffers
2129      * thereby triggering the output reconfiguration codepath */
2130     gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2131         (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2132     /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2133   } else {
2134     GstMessage *msg;
2135
2136     dbin->output_streams = g_list_remove (dbin->output_streams, output);
2137     free_output_stream (dbin, output);
2138     msg = is_selection_done (slot->dbin);
2139     SELECTION_UNLOCK (dbin);
2140
2141     if (msg)
2142       gst_element_post_message ((GstElement *) slot->dbin, msg);
2143   }
2144
2145   return TRUE;
2146 }
2147
2148 /* Idle probe called when a slot should be unassigned from its output stream.
2149  * This is needed to ensure nothing is flowing when unlinking the slot.
2150  *
2151  * Also, this method will search for a pending stream which could re-use
2152  * the output stream. */
2153 static GstPadProbeReturn
2154 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2155     MultiQueueSlot * slot)
2156 {
2157   GstDecodebin3 *dbin = slot->dbin;
2158
2159   reassign_slot (dbin, slot);
2160
2161   return GST_PAD_PROBE_REMOVE;
2162 }
2163
2164 static gboolean
2165 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2166     guint32 seqnum)
2167 {
2168   gboolean ret = TRUE;
2169   GList *tmp;
2170   /* List of slots to (de)activate. */
2171   GList *to_deactivate = NULL;
2172   GList *to_activate = NULL;
2173   /* List of unknown stream id, most likely means the event
2174    * should be sent upstream so that elements can expose the requested stream */
2175   GList *unknown = NULL;
2176   GList *to_reassign = NULL;
2177   GList *future_request_streams = NULL;
2178   GList *pending_streams = NULL;
2179   GList *slots_to_reassign = NULL;
2180
2181   SELECTION_LOCK (dbin);
2182   if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2183     GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2184     SELECTION_UNLOCK (dbin);
2185     return TRUE;
2186   }
2187   /* Remove pending select_streams */
2188   g_list_free (dbin->pending_select_streams);
2189   dbin->pending_select_streams = NULL;
2190
2191   /* COMPARE the requested streams to the active and requested streams
2192    * on multiqueue. */
2193
2194   /* First check the slots to activate and which ones are unknown */
2195   for (tmp = select_streams; tmp; tmp = tmp->next) {
2196     const gchar *sid = (const gchar *) tmp->data;
2197     MultiQueueSlot *slot;
2198     GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2199     slot = find_slot_for_stream_id (dbin, sid);
2200     /* Find the corresponding slot */
2201     if (slot == NULL) {
2202       if (stream_in_collection (dbin, (gchar *) sid)) {
2203         pending_streams = g_list_append (pending_streams, (gchar *) sid);
2204       } else {
2205         GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2206         unknown = g_list_append (unknown, (gchar *) sid);
2207       }
2208     } else if (slot->output == NULL) {
2209       GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2210           slot, sid);
2211       to_activate = g_list_append (to_activate, slot);
2212     } else {
2213       GST_DEBUG_OBJECT (dbin,
2214           "Stream '%s' from slot %p is already active on output %p", sid, slot,
2215           slot->output);
2216       future_request_streams =
2217           g_list_append (future_request_streams, (gchar *) sid);
2218     }
2219   }
2220
2221   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2222     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2223     /* For slots that have an output, check if it's part of the streams to
2224      * be active */
2225     if (slot->output) {
2226       gboolean slot_to_deactivate = TRUE;
2227
2228       if (slot->active_stream) {
2229         if (stream_in_list (select_streams,
2230                 gst_stream_get_stream_id (slot->active_stream)))
2231           slot_to_deactivate = FALSE;
2232       }
2233       if (slot_to_deactivate && slot->pending_stream
2234           && slot->pending_stream != slot->active_stream) {
2235         if (stream_in_list (select_streams,
2236                 gst_stream_get_stream_id (slot->pending_stream)))
2237           slot_to_deactivate = FALSE;
2238       }
2239       if (slot_to_deactivate) {
2240         GST_DEBUG_OBJECT (dbin,
2241             "Slot %p (%s) should be deactivated, no longer used", slot,
2242             gst_stream_get_stream_id (slot->active_stream));
2243         to_deactivate = g_list_append (to_deactivate, slot);
2244       }
2245     }
2246   }
2247
2248   if (to_deactivate != NULL) {
2249     GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2250     /* We need to compare what needs to be activated and deactivated in order
2251      * to determine whether there are outputs that can be transferred */
2252     /* Take the stream-id of the slots that are to be activated, for which there
2253      * is a slot of the same type that needs to be deactivated */
2254     tmp = to_deactivate;
2255     while (tmp) {
2256       MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2257       gboolean removeit = FALSE;
2258       GList *tmp2, *next;
2259       GST_DEBUG_OBJECT (dbin,
2260           "Checking if slot to deactivate (%p) has a candidate slot to activate",
2261           slot_to_deactivate);
2262       for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2263         MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2264         GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2265         if (slot_to_activate->type == slot_to_deactivate->type) {
2266           GST_DEBUG_OBJECT (dbin, "Re-using");
2267           to_reassign = g_list_append (to_reassign, (gchar *)
2268               gst_stream_get_stream_id (slot_to_activate->active_stream));
2269           slots_to_reassign =
2270               g_list_append (slots_to_reassign, slot_to_deactivate);
2271           to_activate = g_list_remove (to_activate, slot_to_activate);
2272           removeit = TRUE;
2273           break;
2274         }
2275       }
2276       next = tmp->next;
2277       if (removeit)
2278         to_deactivate = g_list_delete_link (to_deactivate, tmp);
2279       tmp = next;
2280     }
2281   }
2282
2283   for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2284     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2285     GST_DEBUG_OBJECT (dbin,
2286         "Really need to deactivate slot %p, but no available alternative",
2287         slot);
2288
2289     slots_to_reassign = g_list_append (slots_to_reassign, slot);
2290   }
2291
2292   /* The only slots left to activate are the ones that won't be reassigned and
2293    * therefore really need to have a new output created */
2294   for (tmp = to_activate; tmp; tmp = tmp->next) {
2295     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2296     if (slot->active_stream)
2297       future_request_streams =
2298           g_list_append (future_request_streams,
2299           (gchar *) gst_stream_get_stream_id (slot->active_stream));
2300     else if (slot->pending_stream)
2301       future_request_streams =
2302           g_list_append (future_request_streams,
2303           (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2304     else
2305       GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2306   }
2307
2308   if (to_activate == NULL && pending_streams != NULL) {
2309     GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2310     if (dbin->requested_selection)
2311       g_list_free (dbin->requested_selection);
2312     dbin->requested_selection = select_streams;
2313     g_list_free (to_deactivate);
2314     g_list_free (pending_streams);
2315     to_deactivate = NULL;
2316   } else {
2317     if (dbin->requested_selection)
2318       g_list_free (dbin->requested_selection);
2319     dbin->requested_selection = future_request_streams;
2320     dbin->requested_selection =
2321         g_list_concat (dbin->requested_selection, pending_streams);
2322     if (dbin->to_activate)
2323       g_list_free (dbin->to_activate);
2324     dbin->to_activate = to_reassign;
2325   }
2326
2327   dbin->selection_updated = TRUE;
2328   SELECTION_UNLOCK (dbin);
2329
2330   if (unknown)
2331     GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2332
2333   /* For all streams to deactivate, add an idle probe where we will do
2334    * the unassignment and switch over */
2335   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2336     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2337     gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2338         (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2339   }
2340
2341   return ret;
2342 }
2343
2344 static GstPadProbeReturn
2345 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2346     DecodebinOutputStream * output)
2347 {
2348   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2349   GstDecodebin3 *dbin = output->dbin;
2350   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2351
2352   GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2353
2354   switch (GST_EVENT_TYPE (event)) {
2355     case GST_EVENT_SELECT_STREAMS:
2356     {
2357       GstPad *peer;
2358       GList *streams = NULL;
2359       guint32 seqnum = gst_event_get_seqnum (event);
2360
2361       SELECTION_LOCK (dbin);
2362       if (seqnum == dbin->select_streams_seqnum) {
2363         SELECTION_UNLOCK (dbin);
2364         GST_DEBUG_OBJECT (pad,
2365             "Already handled/handling that SELECT_STREAMS event");
2366         break;
2367       }
2368       dbin->select_streams_seqnum = seqnum;
2369       if (dbin->pending_select_streams != NULL) {
2370         GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2371         g_list_free (dbin->pending_select_streams);
2372         dbin->pending_select_streams = NULL;
2373       }
2374       gst_event_parse_select_streams (event, &streams);
2375       dbin->pending_select_streams = g_list_copy (streams);
2376       SELECTION_UNLOCK (dbin);
2377
2378       /* Send event upstream */
2379       if ((peer = gst_pad_get_peer (pad))) {
2380         gst_pad_send_event (peer, event);
2381         gst_object_unref (peer);
2382       } else {
2383         gst_event_unref (event);
2384       }
2385       /* Finally handle the switch */
2386       if (streams)
2387         handle_stream_switch (dbin, streams, seqnum);
2388       ret = GST_PAD_PROBE_HANDLED;
2389     }
2390       break;
2391     default:
2392       break;
2393   }
2394
2395   return ret;
2396 }
2397
2398 static gboolean
2399 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
2400 {
2401   GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
2402   if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
2403     GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2404     GList *streams = NULL;
2405     guint32 seqnum = gst_event_get_seqnum (event);
2406
2407     SELECTION_LOCK (dbin);
2408     if (seqnum == dbin->select_streams_seqnum) {
2409       SELECTION_UNLOCK (dbin);
2410       GST_DEBUG_OBJECT (dbin,
2411           "Already handled/handling that SELECT_STREAMS event");
2412       return TRUE;
2413     }
2414     dbin->select_streams_seqnum = seqnum;
2415     if (dbin->pending_select_streams != NULL) {
2416       GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2417       g_list_free (dbin->pending_select_streams);
2418       dbin->pending_select_streams = NULL;
2419     }
2420     gst_event_parse_select_streams (event, &streams);
2421     dbin->pending_select_streams = g_list_copy (streams);
2422     SELECTION_UNLOCK (dbin);
2423
2424     /* FIXME : We don't have an upstream ?? */
2425 #if 0
2426     /* Send event upstream */
2427     if ((peer = gst_pad_get_peer (pad))) {
2428       gst_pad_send_event (peer, event);
2429       gst_object_unref (peer);
2430     }
2431 #endif
2432     /* Finally handle the switch */
2433     if (streams)
2434       handle_stream_switch (dbin, streams, seqnum);
2435
2436     gst_event_unref (event);
2437     return TRUE;
2438   }
2439   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
2440 }
2441
2442
2443 static void
2444 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2445 {
2446   if (slot->probe_id)
2447     gst_pad_remove_probe (slot->src_pad, slot->probe_id);
2448   if (slot->input) {
2449     if (slot->input->srcpad)
2450       gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
2451   }
2452
2453   gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2454   gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
2455   gst_object_replace ((GstObject **) & slot->src_pad, NULL);
2456   gst_object_replace ((GstObject **) & slot->active_stream, NULL);
2457   g_free (slot);
2458 }
2459
2460 static void
2461 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2462 {
2463   GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
2464   gst_element_call_async (GST_ELEMENT_CAST (dbin),
2465       (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
2466 }
2467
2468 /* Create a DecodebinOutputStream for a given type
2469  * Note: It will be empty initially, it needs to be configured
2470  * afterwards */
2471 static DecodebinOutputStream *
2472 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
2473 {
2474   DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
2475   gchar *pad_name;
2476   const gchar *prefix;
2477   GstStaticPadTemplate *templ;
2478   GstPadTemplate *ptmpl;
2479   guint32 *counter;
2480   GstPad *internal_pad;
2481
2482   GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
2483       res, gst_stream_type_get_name (type));
2484
2485   res->type = type;
2486   res->dbin = dbin;
2487
2488   if (type & GST_STREAM_TYPE_VIDEO) {
2489     templ = &video_src_template;
2490     counter = &dbin->vpadcount;
2491     prefix = "video";
2492   } else if (type & GST_STREAM_TYPE_AUDIO) {
2493     templ = &audio_src_template;
2494     counter = &dbin->apadcount;
2495     prefix = "audio";
2496   } else if (type & GST_STREAM_TYPE_TEXT) {
2497     templ = &text_src_template;
2498     counter = &dbin->tpadcount;
2499     prefix = "text";
2500   } else {
2501     templ = &src_template;
2502     counter = &dbin->opadcount;
2503     prefix = "src";
2504   }
2505
2506   pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
2507   *counter += 1;
2508   ptmpl = gst_static_pad_template_get (templ);
2509   res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
2510   gst_object_unref (ptmpl);
2511   g_free (pad_name);
2512   gst_pad_set_active (res->src_pad, TRUE);
2513   /* Put an event probe on the internal proxy pad to detect upstream
2514    * events */
2515   internal_pad =
2516       (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
2517   gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
2518       (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
2519   gst_object_unref (internal_pad);
2520
2521   dbin->output_streams = g_list_append (dbin->output_streams, res);
2522
2523   return res;
2524 }
2525
2526 static void
2527 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
2528 {
2529   if (output->slot) {
2530     if (output->decoder_sink && output->decoder)
2531       gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
2532
2533     output->slot->output = NULL;
2534     output->slot = NULL;
2535   }
2536   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2537   gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
2538   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2539   if (output->src_exposed) {
2540     gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
2541   }
2542   if (output->decoder) {
2543     gst_element_set_locked_state (output->decoder, TRUE);
2544     gst_element_set_state (output->decoder, GST_STATE_NULL);
2545     gst_bin_remove ((GstBin *) dbin, output->decoder);
2546   }
2547   g_free (output);
2548 }
2549
2550 static GstStateChangeReturn
2551 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
2552 {
2553   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2554   GstStateChangeReturn ret;
2555
2556   /* Upwards */
2557   switch (transition) {
2558     default:
2559       break;
2560   }
2561   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2562   if (ret == GST_STATE_CHANGE_FAILURE)
2563     goto beach;
2564
2565   switch (transition) {
2566     case GST_STATE_CHANGE_PAUSED_TO_READY:
2567     {
2568       GList *tmp;
2569
2570       /* Free output streams */
2571       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2572         DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2573         free_output_stream (dbin, output);
2574       }
2575       g_list_free (dbin->output_streams);
2576       dbin->output_streams = NULL;
2577       /* Free multiqueue slots */
2578       for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2579         MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2580         free_multiqueue_slot (dbin, slot);
2581       }
2582       g_list_free (dbin->slots);
2583       dbin->slots = NULL;
2584       /* Free inputs */
2585     }
2586       break;
2587     default:
2588       break;
2589   }
2590 beach:
2591   return ret;
2592 }
2593
2594 gboolean
2595 gst_decodebin3_plugin_init (GstPlugin * plugin)
2596 {
2597   GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");
2598
2599   return gst_element_register (plugin, "decodebin3", GST_RANK_NONE,
2600       GST_TYPE_DECODEBIN3);
2601 }