decodebin3: Push EOS to output stream if they are all drained
[platform/upstream/gstreamer.git] / gst / playback / gstdecodebin3.c
1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32
33 #include "gstplayback.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36
37 /**
38  * SECTION:element-decodebin3
39  * @title: decodebin3
40  *
41  * #GstBin that auto-magically constructs a decoding pipeline using available
42  * decoders and demuxers via auto-plugging. The output is raw audio, video
43  * or subtitle streams.
44  *
45  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46  *
47  * * supports publication and selection of stream information via
48  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAM events.
49  *
50  * * dynamically switches stream connections internally, and
51  * reuses decoder elements when stream selections change, so that in
52  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53  * and only creates new elements when streams change and an existing decoder
54  * is not capable of handling the new format.
55  *
56  * * supports multiple input pads for the parallel decoding of auxilliary streams
57  * not muxed with the primary stream.
58  *
59  * * does not handle network stream buffering. decodebin3 expects that network stream
60  * buffering is handled upstream, before data is passed to it.
61  *
62  * <emphasis>decodebin3 is still experimental API and a technology preview.
63  * Its behaviour and exposed API is subject to change.</emphasis>
64  *
65  */
66
67 /**
68  * Global design
69  *
70  * 1) From sink pad to elementary streams (GstParseBin)
71  *
72  * The input sink pads are fed to GstParseBin. GstParseBin will feed them
73  * through typefind. When the caps are detected (or changed) we recursively
74  * figure out which demuxer, parser or depayloader is needed until we get to
75  * elementary streams.
76  *
77  * All elementary streams (whether decoded or not, whether exposed or not) are
78  * fed through multiqueue. There is only *one* multiqueue in decodebin3.
79  *
80  * => MultiQueue is the cornerstone.
81  * => No buffering before multiqueue
82  *
83  * 2) Elementary streams
84  *
85  * After GstParseBin, there are 3 main components:
86  *  1) Input Streams (provided by GstParseBin)
87  *  2) Multiqueue slots
88  *  3) Output Streams
89  *
90  * Input Streams correspond to the stream coming from GstParseBin and that gets
91  * fed into a multiqueue slot.
92  *
93  * Output Streams correspond to the combination of a (optional) decoder and an
94  * output ghostpad. Output Streams can be moved from one multiqueue slot to
95  * another, can reconfigure itself (different decoders), and can be
96  * added/removed depending on the configuration (all streams outputted, only one
97  * of each type, ...).
98  *
99  * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
100  * each 'active' Input Stream there is a corresponding slot.
101  * Slots might have different streams on input and output (due to internal
102  * buffering).
103  *
104  * Due to internal queuing/buffering/..., all those components (might) behave
105  * asynchronously. Therefore probes will be used on each component source pad to
106  * detect various key-points:
107  *  * EOS :
108  *     the stream is done => Mark that component as done, optionally freeing/removing it
109  *  * STREAM_START :
110  *     a new stream is starting => link it further if needed
111  *
112  * 3) Gradual replacement
113  *
114  * If the caps change at any point in decodebin (input sink pad, demuxer output,
115  * multiqueue output, ..), we gradually replace (if needed) the following elements.
116  *
117  * This is handled by the probes in various locations:
118  *  a) typefind output
119  *  b) multiqueue input (source pad of Input Streams)
120  *  c) multiqueue output (source pad of Multiqueue Slots)
121  *  d) final output (target of source ghostpads)
122  *
123  * When CAPS event arrive at those points, one of three things can happen:
124  * a) There is no elements downstream yet, just create/link-to following elements
125  * b) There are downstream elements, do a ACCEPT_CAPS query
126  *  b.1) The new CAPS are accepted, keep current configuration
127  *  b.2) The new CAPS are not accepted, remove following elements then do a)
128  *
129  *    Components:
130  *
131  *                                                   MultiQ     Output
132  *                     Input(s)                      Slots      Streams
133  *  /-------------------------------------------\   /-----\  /------------- \
134  *
135  * +-------------------------------------------------------------------------+
136  * |                                                                         |
137  * | +---------------------------------------------+                         |
138  * | |   GstParseBin(s)                            |                         |
139  * | |                +--------------+             |  +-----+                |
140  * | |                |              |---[parser]-[|--| Mul |---[ decoder ]-[|
141  * |]--[ typefind ]---|  demuxer(s)  |------------[|  | ti  |                |
142  * | |                |  (if needed) |---[parser]-[|--| qu  |                |
143  * | |                |              |---[parser]-[|--| eu  |---[ decoder ]-[|
144  * | |                +--------------+             |  +------             ^  |
145  * | +---------------------------------------------+        ^             |  |
146  * |                                               ^        |             |  |
147  * +-----------------------------------------------+--------+-------------+--+
148  *                                                 |        |             |
149  *                                                 |        |             |
150  *                                       Probes  --/--------/-------------/
151  *
152  * ATOMIC SWITCHING
153  *
154  * We want to ensure we re-use decoders when switching streams. This takes place
155  * at the multiqueue output level.
156  *
157  * MAIN CONCEPTS
158  *  1) Activating a stream (i.e. linking a slot to an output) is only done within
159  *    the streaming thread in the multiqueue_src_probe() and only if the
160       stream is in the REQUESTED selection.
161  *  2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
162  *    within the stream thread, but only in a purposefully called IDLE probe
163  *    that calls reassign_slot().
164  *
165  * Based on those two principles, 3 "selection" of streams (stream-id) are used:
166  * 1) requested_selection
167  *    All streams within that list should be activated
168  * 2) active_selection
169  *    List of streams that are exposed by decodebin
170  * 3) to_activate
171  *    List of streams that will be moved to requested_selection in the
172  *    reassign_slot() method (i.e. once a stream was deactivated, and the output
173  *    was retargetted)
174  */
175
176
177 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
178 #define GST_CAT_DEFAULT decodebin3_debug
179
180 #define GST_TYPE_DECODEBIN3      (gst_decodebin3_get_type ())
181
182 #define EXTRA_DEBUG 1
183
184 typedef struct _GstDecodebin3 GstDecodebin3;
185 typedef struct _GstDecodebin3Class GstDecodebin3Class;
186
187 typedef struct _DecodebinInputStream DecodebinInputStream;
188 typedef struct _DecodebinInput DecodebinInput;
189 typedef struct _DecodebinOutputStream DecodebinOutputStream;
190
191 struct _GstDecodebin3
192 {
193   GstBin bin;
194
195   /* input_lock protects the following variables */
196   GMutex input_lock;
197   /* Main input (static sink pad) */
198   DecodebinInput *main_input;
199   /* Supplementary input (request sink pads) */
200   GList *other_inputs;
201   /* counter for input */
202   guint32 input_counter;
203   /* Current stream group_id (default : G_MAXUINT32) */
204   /* FIXME : Needs to be resetted appropriately (when upstream changes ?) */
205   guint32 current_group_id;
206   /* End of variables protected by input_lock */
207
208   GstElement *multiqueue;
209
210   /* FIXME : Mutex for protecting values below */
211   GstStreamCollection *collection;      /* Active collection */
212
213   GList *input_streams;         /* List of DecodebinInputStream for active collection */
214   GList *output_streams;        /* List of DecodebinOutputStream used for output */
215   GList *slots;                 /* List of MultiQueueSlot */
216   guint slot_id;
217
218   /* selection_lock protects access to following variables */
219   GMutex selection_lock;
220   /* requested selection of stream-id to activate post-multiqueue */
221   GList *requested_selection;
222   /* list of stream-id currently activated in output */
223   GList *active_selection;
224   /* List of stream-id that need to be activated (after a stream switch for ex) */
225   GList *to_activate;
226   /* Pending select streams event */
227   guint32 select_streams_seqnum;
228   /* pending list of streams to select (from downstream) */
229   GList *pending_select_streams;
230   /* TRUE if requested_selection was updated, will become FALSE once
231    * it has fully transitioned to active */
232   gboolean selection_updated;
233   /* End of variables protected by selection_lock */
234
235   /* List of pending collections.
236    * FIXME : Is this really needed ? */
237   GList *pending_collection;
238
239
240   /* Factories */
241   GMutex factories_lock;
242   guint32 factories_cookie;
243   /* All DECODABLE factories */
244   GList *factories;
245   /* Only DECODER factories */
246   GList *decoder_factories;
247   /* DECODABLE but not DECODER factories */
248   GList *decodable_factories;
249
250   /* counters for pads */
251   guint32 apadcount, vpadcount, tpadcount, opadcount;
252
253   /* Properties */
254   GstCaps *caps;
255 };
256
257 struct _GstDecodebin3Class
258 {
259   GstBinClass class;
260
261     gint (*select_stream) (GstDecodebin3 * dbin,
262       GstStreamCollection * collection, GstStream * stream);
263 };
264
265 /* Input of decodebin, controls input pad and parsebin */
266 struct _DecodebinInput
267 {
268   GstDecodebin3 *dbin;
269
270   gboolean is_main;
271
272   GstPad *ghost_sink;
273   GstPad *parsebin_sink;
274
275   GstStreamCollection *collection;      /* Active collection */
276
277   guint group_id;
278
279   GstElement *parsebin;
280
281   gulong pad_added_sigid;
282   gulong pad_removed_sigid;
283
284   /* HACK : Remove these fields */
285   /* List of PendingPad structures */
286   GList *pending_pads;
287 };
288
289 /* Multiqueue Slots */
290 typedef struct _MultiQueueSlot
291 {
292   guint id;
293
294   GstDecodebin3 *dbin;
295   /* Type of stream handled by this slot */
296   GstStreamType type;
297
298   /* Linked input and output */
299   DecodebinInputStream *input;
300
301   /* pending => last stream received on sink pad */
302   GstStream *pending_stream;
303   /* active => last stream outputted on source pad */
304   GstStream *active_stream;
305
306   GstPad *sink_pad, *src_pad;
307
308   /* id of the MQ src_pad event probe */
309   gulong probe_id;
310
311   gboolean is_drained;
312
313   DecodebinOutputStream *output;
314 } MultiQueueSlot;
315
316 /* Streams that are exposed downstream (i.e. output) */
317 struct _DecodebinOutputStream
318 {
319   GstDecodebin3 *dbin;
320   /* The type of stream handled by this output stream */
321   GstStreamType type;
322
323   /* The slot to which this output stream is currently connected to */
324   MultiQueueSlot *slot;
325
326   GstElement *decoder;          /* Optional */
327   GstPad *decoder_sink, *decoder_src;
328   gboolean linked;
329
330   /* ghostpad */
331   GstPad *src_pad;
332   /* Flag if ghost pad is exposed */
333   gboolean src_exposed;
334
335   /* keyframe dropping probe */
336   gulong drop_probe_id;
337 };
338
339 /* Pending pads from parsebin */
340 typedef struct _PendingPad
341 {
342   GstDecodebin3 *dbin;
343   DecodebinInput *input;
344   GstPad *pad;
345
346   gulong buffer_probe;
347   gulong event_probe;
348   gboolean saw_eos;
349 } PendingPad;
350
351 /* properties */
352 #define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
353
354 enum
355 {
356   PROP_0,
357   PROP_CAPS
358 };
359
360 /* signals */
361 enum
362 {
363   SIGNAL_SELECT_STREAM,
364   LAST_SIGNAL
365 };
366 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
367
368 #define SELECTION_LOCK(dbin) G_STMT_START {                             \
369     GST_LOG_OBJECT (dbin,                                               \
370                     "selection locking from thread %p",                 \
371                     g_thread_self ());                                  \
372     g_mutex_lock (&dbin->selection_lock);                               \
373     GST_LOG_OBJECT (dbin,                                               \
374                     "selection locked from thread %p",                  \
375                     g_thread_self ());                                  \
376   } G_STMT_END
377
378 #define SELECTION_UNLOCK(dbin) G_STMT_START {                           \
379     GST_LOG_OBJECT (dbin,                                               \
380                     "selection unlocking from thread %p",               \
381                     g_thread_self ());                                  \
382     g_mutex_unlock (&dbin->selection_lock);                             \
383   } G_STMT_END
384
385 #define INPUT_LOCK(dbin) G_STMT_START {                         \
386     GST_LOG_OBJECT (dbin,                                               \
387                     "input locking from thread %p",                     \
388                     g_thread_self ());                                  \
389     g_mutex_lock (&dbin->input_lock);                           \
390     GST_LOG_OBJECT (dbin,                                               \
391                     "input locked from thread %p",                      \
392                     g_thread_self ());                                  \
393   } G_STMT_END
394
395 #define INPUT_UNLOCK(dbin) G_STMT_START {                               \
396     GST_LOG_OBJECT (dbin,                                               \
397                     "input unlocking from thread %p",           \
398                     g_thread_self ());                                  \
399     g_mutex_unlock (&dbin->input_lock);                         \
400   } G_STMT_END
401
402 GType gst_decodebin3_get_type (void);
403 #define gst_decodebin3_parent_class parent_class
404 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
405
406 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
407
408 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
409     GST_PAD_SINK,
410     GST_PAD_ALWAYS,
411     GST_STATIC_CAPS_ANY);
412
413 static GstStaticPadTemplate request_sink_template =
414 GST_STATIC_PAD_TEMPLATE ("sink_%u",
415     GST_PAD_SINK,
416     GST_PAD_REQUEST,
417     GST_STATIC_CAPS_ANY);
418
419 static GstStaticPadTemplate video_src_template =
420 GST_STATIC_PAD_TEMPLATE ("video_%u",
421     GST_PAD_SRC,
422     GST_PAD_SOMETIMES,
423     GST_STATIC_CAPS_ANY);
424
425 static GstStaticPadTemplate audio_src_template =
426 GST_STATIC_PAD_TEMPLATE ("audio_%u",
427     GST_PAD_SRC,
428     GST_PAD_SOMETIMES,
429     GST_STATIC_CAPS_ANY);
430
431 static GstStaticPadTemplate text_src_template =
432 GST_STATIC_PAD_TEMPLATE ("text_%u",
433     GST_PAD_SRC,
434     GST_PAD_SOMETIMES,
435     GST_STATIC_CAPS_ANY);
436
437 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
438     GST_PAD_SRC,
439     GST_PAD_SOMETIMES,
440     GST_STATIC_CAPS_ANY);
441
442
443 static void gst_decodebin3_dispose (GObject * object);
444 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
445     const GValue * value, GParamSpec * pspec);
446 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
447     GValue * value, GParamSpec * pspec);
448
449 static gboolean parsebin_autoplug_continue_cb (GstElement *
450     parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
451
452 static gint
453 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
454     GstStreamCollection * collection, GstStream * stream)
455 {
456   GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
457
458   return -1;
459 }
460
461 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
462     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
463 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
464 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
465     GstStateChange transition);
466 static gboolean gst_decodebin3_send_event (GstElement * element,
467     GstEvent * event);
468
469 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
470 #if 0
471 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
472     GstElementFactoryListType ftype);
473 #endif
474
475 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
476 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
477 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
478 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
479
480 static void reconfigure_output_stream (DecodebinOutputStream * output,
481     MultiQueueSlot * slot);
482 static void free_output_stream (GstDecodebin3 * dbin,
483     DecodebinOutputStream * output);
484 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
485     GstStreamType type);
486
487 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
488     GstPadProbeInfo * info, MultiQueueSlot * slot);
489 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
490 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
491     DecodebinInputStream * input);
492 static void link_input_to_slot (DecodebinInputStream * input,
493     MultiQueueSlot * slot);
494 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
495 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
496     MultiQueueSlot * slot);
497
498 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
499 static void update_requested_selection (GstDecodebin3 * dbin,
500     GstStreamCollection * collection);
501
502 /* FIXME: Really make all the parser stuff a self-contained helper object */
503 #include "gstdecodebin3-parse.c"
504
505 static gboolean
506 _gst_int_accumulator (GSignalInvocationHint * ihint,
507     GValue * return_accu, const GValue * handler_return, gpointer dummy)
508 {
509   gint res = g_value_get_int (handler_return);
510
511   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
512     g_value_set_int (return_accu, res);
513
514   if (res == -1)
515     return TRUE;
516
517   return FALSE;
518 }
519
520 static void
521 gst_decodebin3_class_init (GstDecodebin3Class * klass)
522 {
523   GObjectClass *gobject_klass = (GObjectClass *) klass;
524   GstElementClass *element_class = (GstElementClass *) klass;
525   GstBinClass *bin_klass = (GstBinClass *) klass;
526
527   gobject_klass->dispose = gst_decodebin3_dispose;
528   gobject_klass->set_property = gst_decodebin3_set_property;
529   gobject_klass->get_property = gst_decodebin3_get_property;
530
531   /* FIXME : ADD PROPERTIES ! */
532   g_object_class_install_property (gobject_klass, PROP_CAPS,
533       g_param_spec_boxed ("caps", "Caps",
534           "The caps on which to stop decoding. (NULL = default)",
535           GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
536
537   /* FIXME : ADD SIGNALS ! */
538   /**
539    * GstDecodebin3::select-stream
540    * @decodebin: a #GstDecodebin3
541    * @collection: a #GstStreamCollection
542    * @stream: a #GstStream
543    *
544    * This signal is emitted whenever @decodebin needs to decide whether
545    * to expose a @stream of a given @collection.
546    *
547    * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
548    * A value of -1 (default) lets @decodebin decide what to do with the stream.
549    * */
550   gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
551       g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
552       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
553       _gst_int_accumulator, NULL, g_cclosure_marshal_generic,
554       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
555
556
557   element_class->request_new_pad =
558       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
559   element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
560   element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
561
562   gst_element_class_add_pad_template (element_class,
563       gst_static_pad_template_get (&sink_template));
564   gst_element_class_add_pad_template (element_class,
565       gst_static_pad_template_get (&request_sink_template));
566   gst_element_class_add_pad_template (element_class,
567       gst_static_pad_template_get (&video_src_template));
568   gst_element_class_add_pad_template (element_class,
569       gst_static_pad_template_get (&audio_src_template));
570   gst_element_class_add_pad_template (element_class,
571       gst_static_pad_template_get (&text_src_template));
572   gst_element_class_add_pad_template (element_class,
573       gst_static_pad_template_get (&src_template));
574
575   gst_element_class_set_static_metadata (element_class,
576       "Decoder Bin 3", "Generic/Bin/Decoder",
577       "Autoplug and decode to raw media",
578       "Edward Hervey <edward@centricular.com>");
579
580   bin_klass->handle_message = gst_decodebin3_handle_message;
581
582   klass->select_stream = gst_decodebin3_select_stream;
583 }
584
585 static void
586 gst_decodebin3_init (GstDecodebin3 * dbin)
587 {
588   /* Create main input */
589   dbin->main_input = create_new_input (dbin, TRUE);
590
591   dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
592   g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
593       "max-size-buffers", 0, "use-interleave", TRUE, NULL);
594   gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
595
596   dbin->current_group_id = G_MAXUINT32;
597
598   g_mutex_init (&dbin->factories_lock);
599   g_mutex_init (&dbin->selection_lock);
600   g_mutex_init (&dbin->input_lock);
601
602   dbin->caps = gst_static_caps_get (&default_raw_caps);
603
604   GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
605 }
606
607 static void
608 gst_decodebin3_dispose (GObject * object)
609 {
610   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
611   GList *walk, *next;
612
613   if (dbin->factories)
614     gst_plugin_feature_list_free (dbin->factories);
615   if (dbin->decoder_factories)
616     g_list_free (dbin->decoder_factories);
617   if (dbin->decodable_factories)
618     g_list_free (dbin->decodable_factories);
619   g_list_free_full (dbin->requested_selection, g_free);
620   g_list_free (dbin->active_selection);
621   g_list_free (dbin->to_activate);
622   g_list_free (dbin->pending_select_streams);
623   g_clear_object (&dbin->collection);
624
625   free_input (dbin, dbin->main_input);
626
627   for (walk = dbin->other_inputs; walk; walk = next) {
628     DecodebinInput *input = walk->data;
629
630     next = g_list_next (walk);
631
632     free_input (dbin, input);
633     dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
634   }
635
636   G_OBJECT_CLASS (parent_class)->dispose (object);
637 }
638
639 static void
640 gst_decodebin3_set_property (GObject * object, guint prop_id,
641     const GValue * value, GParamSpec * pspec)
642 {
643   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
644
645   /* FIXME : IMPLEMENT */
646   switch (prop_id) {
647     case PROP_CAPS:
648       GST_OBJECT_LOCK (dbin);
649       if (dbin->caps)
650         gst_caps_unref (dbin->caps);
651       dbin->caps = g_value_dup_boxed (value);
652       GST_OBJECT_UNLOCK (dbin);
653       break;
654     default:
655       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
656       break;
657   }
658 }
659
660 static void
661 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
662     GParamSpec * pspec)
663 {
664   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
665
666   /* FIXME : IMPLEMENT */
667   switch (prop_id) {
668     case PROP_CAPS:
669       GST_OBJECT_LOCK (dbin);
670       g_value_set_boxed (value, dbin->caps);
671       GST_OBJECT_UNLOCK (dbin);
672       break;
673     default:
674       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
675       break;
676   }
677 }
678
679 static gboolean
680 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
681     GstCaps * caps, GstDecodebin3 * dbin)
682 {
683   GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
684
685   /* If it matches our target caps, expose it */
686   if (gst_caps_can_intersect (caps, dbin->caps))
687     return FALSE;
688
689   return TRUE;
690 }
691
692 /* This method should be called whenever a STREAM_START event
693  * comes out of a given parsebin.
694  * The caller shall replace the group_id if the function returns TRUE */
695 static gboolean
696 set_input_group_id (DecodebinInput * input, guint32 * group_id)
697 {
698   GstDecodebin3 *dbin = input->dbin;
699
700   if (input->group_id != *group_id) {
701     if (input->group_id != G_MAXUINT32)
702       GST_WARNING_OBJECT (dbin,
703           "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
704           ") on input %p ", input->group_id, *group_id, input);
705     input->group_id = *group_id;
706   }
707
708   if (*group_id != dbin->current_group_id) {
709     if (dbin->current_group_id == G_MAXUINT32) {
710       GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
711           *group_id);
712       dbin->current_group_id = *group_id;
713     }
714     *group_id = dbin->current_group_id;
715     return TRUE;
716   }
717
718   return FALSE;
719 }
720
721 /* Call with INPUT_LOCK taken */
722 static gboolean
723 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
724 {
725   gboolean set_state = FALSE;
726
727   if (input->parsebin == NULL) {
728     input->parsebin = gst_element_factory_make ("parsebin", NULL);
729     if (input->parsebin == NULL)
730       goto no_parsebin;
731     input->parsebin = gst_object_ref (input->parsebin);
732     input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
733     input->pad_added_sigid =
734         g_signal_connect (input->parsebin, "pad-added",
735         (GCallback) parsebin_pad_added_cb, input);
736     input->pad_removed_sigid =
737         g_signal_connect (input->parsebin, "pad-removed",
738         (GCallback) parsebin_pad_removed_cb, input);
739     g_signal_connect (input->parsebin, "autoplug-continue",
740         (GCallback) parsebin_autoplug_continue_cb, dbin);
741   }
742
743   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
744     gst_bin_add (GST_BIN (dbin), input->parsebin);
745     set_state = TRUE;
746   }
747
748   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
749       input->parsebin_sink);
750   if (set_state)
751     gst_element_sync_state_with_parent (input->parsebin);
752
753   return TRUE;
754
755   /* ERRORS */
756 no_parsebin:
757   {
758     gst_element_post_message ((GstElement *) dbin,
759         gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
760     return FALSE;
761   }
762 }
763
764 static GstPadLinkReturn
765 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
766 {
767   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
768   GstPadLinkReturn res = GST_PAD_LINK_OK;
769   DecodebinInput *input;
770
771   GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
772       ". Creating parsebin if needed", pad);
773
774   if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
775     goto fail;
776
777   INPUT_LOCK (dbin);
778   if (!ensure_input_parsebin (dbin, input))
779     res = GST_PAD_LINK_REFUSED;
780   INPUT_UNLOCK (dbin);
781
782   return res;
783 fail:
784   GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
785   return GST_PAD_LINK_REFUSED;
786 }
787
788 /* Drop duration query during _input_pad_unlink */
789 static GstPadProbeReturn
790 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
791     DecodebinInput * input)
792 {
793   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
794
795   if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
796     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
797     if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
798       GST_LOG_OBJECT (pad, "stop forwarding query duration");
799       ret = GST_PAD_PROBE_HANDLED;
800     }
801   }
802
803   return ret;
804 }
805
806 static void
807 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
808 {
809   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
810   DecodebinInput *input;
811
812   GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
813       ". Removing parsebin.", pad);
814
815   if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
816     goto fail;
817
818   INPUT_LOCK (dbin);
819   if (input->parsebin == NULL) {
820     INPUT_UNLOCK (dbin);
821     return;
822   }
823
824   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
825     GstStreamCollection *collection = NULL;
826     gulong probe_id = gst_pad_add_probe (input->parsebin_sink,
827         GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
828         (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
829
830     /* Clear stream-collection corresponding to current INPUT and post new
831      * stream-collection message, if needed */
832     if (input->collection) {
833       gst_object_unref (input->collection);
834       input->collection = NULL;
835     }
836
837     collection = get_merged_collection (dbin);
838     if (collection && collection != dbin->collection) {
839       GstMessage *msg;
840       GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
841
842       if (dbin->collection)
843         gst_object_unref (dbin->collection);
844       dbin->collection = collection;
845
846       msg =
847           gst_message_new_stream_collection ((GstObject *) dbin,
848           dbin->collection);
849
850       gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
851       update_requested_selection (dbin, dbin->collection);
852     }
853
854     gst_bin_remove (GST_BIN (dbin), input->parsebin);
855     gst_element_set_state (input->parsebin, GST_STATE_NULL);
856     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
857     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
858     gst_pad_remove_probe (input->parsebin_sink, probe_id);
859     gst_object_unref (input->parsebin);
860     gst_object_unref (input->parsebin_sink);
861
862     input->parsebin = NULL;
863     input->parsebin_sink = NULL;
864
865     if (!input->is_main) {
866       dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
867       free_input_async (dbin, input);
868     }
869   }
870   INPUT_UNLOCK (dbin);
871   return;
872
873 fail:
874   GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
875   return;
876 }
877
878 static void
879 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
880 {
881   GST_DEBUG ("Freeing input %p", input);
882   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
883   gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
884   if (input->parsebin) {
885     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
886     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
887     gst_element_set_state (input->parsebin, GST_STATE_NULL);
888     gst_object_unref (input->parsebin);
889     gst_object_unref (input->parsebin_sink);
890   }
891   if (input->collection)
892     gst_object_unref (input->collection);
893   g_free (input);
894 }
895
896 static void
897 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
898 {
899   GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
900   gst_element_call_async (GST_ELEMENT_CAST (dbin),
901       (GstElementCallAsyncFunc) free_input, input, NULL);
902 }
903
904 /* Call with INPUT_LOCK taken */
905 static DecodebinInput *
906 create_new_input (GstDecodebin3 * dbin, gboolean main)
907 {
908   DecodebinInput *input;
909
910   input = g_new0 (DecodebinInput, 1);
911   input->dbin = dbin;
912   input->is_main = main;
913   input->group_id = G_MAXUINT32;
914   if (main)
915     input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
916   else {
917     gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
918     input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
919     g_free (pad_name);
920   }
921   g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
922   gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
923   gst_pad_set_unlink_function (input->ghost_sink,
924       gst_decodebin3_input_pad_unlink);
925
926   gst_pad_set_active (input->ghost_sink, TRUE);
927   gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
928
929   return input;
930
931 }
932
933 static GstPad *
934 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
935     const gchar * name, const GstCaps * caps)
936 {
937   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
938   DecodebinInput *input;
939   GstPad *res = NULL;
940
941   /* We are ignoring names for the time being, not sure it makes any sense
942    * within the context of decodebin3 ... */
943   INPUT_LOCK (dbin);
944   input = create_new_input (dbin, FALSE);
945   if (input) {
946     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
947     res = input->ghost_sink;
948   }
949   INPUT_UNLOCK (dbin);
950
951   return res;
952 }
953
954 /* Must be called with factories lock! */
955 static void
956 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
957 {
958   guint cookie;
959
960   cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
961   if (!dbin->factories || dbin->factories_cookie != cookie) {
962     GList *tmp;
963     if (dbin->factories)
964       gst_plugin_feature_list_free (dbin->factories);
965     if (dbin->decoder_factories)
966       g_list_free (dbin->decoder_factories);
967     if (dbin->decodable_factories)
968       g_list_free (dbin->decodable_factories);
969     dbin->factories =
970         gst_element_factory_list_get_elements
971         (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
972     dbin->factories =
973         g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
974     dbin->factories_cookie = cookie;
975
976     /* Filter decoder and other decodables */
977     dbin->decoder_factories = NULL;
978     dbin->decodable_factories = NULL;
979     for (tmp = dbin->factories; tmp; tmp = tmp->next) {
980       GstElementFactory *fact = (GstElementFactory *) tmp->data;
981       if (gst_element_factory_list_is_type (fact,
982               GST_ELEMENT_FACTORY_TYPE_DECODER))
983         dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
984       else
985         dbin->decodable_factories =
986             g_list_append (dbin->decodable_factories, fact);
987     }
988   }
989 }
990
991 /* Must be called with appropriate lock if list is a protected variable */
992 static const gchar *
993 stream_in_list (GList * list, const gchar * sid)
994 {
995   GList *tmp;
996
997 #if EXTRA_DEBUG
998   for (tmp = list; tmp; tmp = tmp->next) {
999     gchar *osid = (gchar *) tmp->data;
1000     GST_DEBUG ("Checking %s against %s", sid, osid);
1001   }
1002 #endif
1003
1004   for (tmp = list; tmp; tmp = tmp->next) {
1005     const gchar *osid = (gchar *) tmp->data;
1006     if (!g_strcmp0 (sid, osid))
1007       return osid;
1008   }
1009
1010   return NULL;
1011 }
1012
1013 static void
1014 update_requested_selection (GstDecodebin3 * dbin,
1015     GstStreamCollection * collection)
1016 {
1017   guint i, nb;
1018   GList *tmp = NULL;
1019   GstStreamType used_types = 0;
1020
1021   nb = gst_stream_collection_get_size (collection);
1022
1023   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1024    *  the switch handler will take care of the pending selection */
1025   SELECTION_LOCK (dbin);
1026   if (dbin->pending_select_streams) {
1027     GST_DEBUG_OBJECT (dbin,
1028         "No need to create pending selection, SELECT_STREAMS underway");
1029     goto beach;
1030   }
1031
1032   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1033   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1034
1035   /* 3. If not, check if we already have some of the streams in the
1036    * existing active/requested selection */
1037   for (i = 0; i < nb; i++) {
1038     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1039     const gchar *sid = gst_stream_get_stream_id (stream);
1040     gint request = -1;
1041     /* Fire select-stream signal to see if outside components want to
1042      * hint at which streams should be selected */
1043     g_signal_emit (G_OBJECT (dbin),
1044         gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1045         &request);
1046     GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1047     if (request == 1 || (request == -1
1048             && (stream_in_list (dbin->requested_selection, sid)
1049                 || stream_in_list (dbin->active_selection, sid)))) {
1050       GstStreamType curtype = gst_stream_get_stream_type (stream);
1051       if (request == 1)
1052         GST_DEBUG_OBJECT (dbin,
1053             "Using stream requested by 'select-stream' signal : %s", sid);
1054       else
1055         GST_DEBUG_OBJECT (dbin,
1056             "Re-using stream already present in requested or active selection : %s",
1057             sid);
1058       tmp = g_list_append (tmp, (gchar *) sid);
1059       used_types |= curtype;
1060     }
1061   }
1062
1063   /* 4. If not, match one stream of each type */
1064   for (i = 0; i < nb; i++) {
1065     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1066     GstStreamType curtype = gst_stream_get_stream_type (stream);
1067     if (!(used_types & curtype)) {
1068       const gchar *sid = gst_stream_get_stream_id (stream);
1069       GST_DEBUG_OBJECT (dbin, "Selecting stream '%s' of type %s",
1070           sid, gst_stream_type_get_name (curtype));
1071       tmp = g_list_append (tmp, (gchar *) sid);
1072       used_types |= curtype;
1073     }
1074   }
1075
1076 beach:
1077   /* Finally set the requested selection */
1078   if (tmp) {
1079     if (dbin->requested_selection) {
1080       GST_FIXME_OBJECT (dbin,
1081           "Replacing non-NULL requested_selection, what should we do ??");
1082       g_list_free_full (dbin->requested_selection, g_free);
1083     }
1084     dbin->requested_selection =
1085         g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1086     dbin->selection_updated = TRUE;
1087     g_list_free (tmp);
1088   }
1089   SELECTION_UNLOCK (dbin);
1090 }
1091
1092 /* Call with INPUT_LOCK taken */
1093 static GstStreamCollection *
1094 get_merged_collection (GstDecodebin3 * dbin)
1095 {
1096   gboolean needs_merge = FALSE;
1097   GstStreamCollection *res = NULL;
1098   GList *tmp;
1099   guint i, nb_stream;
1100
1101   /* First check if we need to do a merge or just return the only collection */
1102   res = dbin->main_input->collection;
1103
1104   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1105     DecodebinInput *input = (DecodebinInput *) tmp->data;
1106     if (input->collection) {
1107       if (res) {
1108         needs_merge = TRUE;
1109         break;
1110       }
1111       res = input->collection;
1112     }
1113   }
1114
1115   if (!needs_merge) {
1116     GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1117     return res ? gst_object_ref (res) : NULL;
1118   }
1119
1120   /* We really need to create a new collection */
1121   /* FIXME : Some numbering scheme maybe ?? */
1122   res = gst_stream_collection_new ("decodebin3");
1123   if (dbin->main_input->collection) {
1124     nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1125     GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1126     for (i = 0; i < nb_stream; i++) {
1127       GstStream *stream =
1128           gst_stream_collection_get_stream (dbin->main_input->collection, i);
1129       gst_stream_collection_add_stream (res, gst_object_ref (stream));
1130     }
1131   }
1132
1133   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1134     DecodebinInput *input = (DecodebinInput *) tmp->data;
1135     GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1136         input->collection);
1137     if (input->collection) {
1138       nb_stream = gst_stream_collection_get_size (input->collection);
1139       GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1140       for (i = 0; i < nb_stream; i++) {
1141         GstStream *stream =
1142             gst_stream_collection_get_stream (input->collection, i);
1143         gst_stream_collection_add_stream (res, gst_object_ref (stream));
1144       }
1145     }
1146   }
1147
1148   return res;
1149 }
1150
1151 /* Call with INPUT_LOCK taken */
1152 static DecodebinInput *
1153 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1154 {
1155   DecodebinInput *input = NULL;
1156   GstElement *parent = gst_object_ref (child);
1157   GList *tmp;
1158
1159   do {
1160     GstElement *next_parent;
1161
1162     GST_DEBUG_OBJECT (dbin, "parent %s",
1163         parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1164
1165     if (parent == dbin->main_input->parsebin) {
1166       input = dbin->main_input;
1167       break;
1168     }
1169     for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1170       DecodebinInput *cur = (DecodebinInput *) tmp->data;
1171       if (parent == cur->parsebin) {
1172         input = cur;
1173         break;
1174       }
1175     }
1176     next_parent = (GstElement *) gst_element_get_parent (parent);
1177     gst_object_unref (parent);
1178     parent = next_parent;
1179
1180   } while (parent && parent != (GstElement *) dbin);
1181
1182   if (parent)
1183     gst_object_unref (parent);
1184
1185   return input;
1186 }
1187
1188 static const gchar *
1189 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1190 {
1191   guint i, len;
1192
1193   if (dbin->collection == NULL)
1194     return NULL;
1195   len = gst_stream_collection_get_size (dbin->collection);
1196   for (i = 0; i < len; i++) {
1197     GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1198     const gchar *osid = gst_stream_get_stream_id (stream);
1199     if (!g_strcmp0 (sid, osid))
1200       return osid;
1201   }
1202
1203   return NULL;
1204 }
1205
1206 /* Call with INPUT_LOCK taken */
1207 static void
1208 handle_stream_collection (GstDecodebin3 * dbin,
1209     GstStreamCollection * collection, GstElement * child)
1210 {
1211 #ifndef GST_DISABLE_GST_DEBUG
1212   const gchar *upstream_id;
1213   guint i;
1214 #endif
1215   DecodebinInput *input = find_message_parsebin (dbin, child);
1216
1217   if (!input) {
1218     GST_DEBUG_OBJECT (dbin,
1219         "Couldn't find corresponding input, most likely shutting down");
1220     return;
1221   }
1222
1223   /* Replace collection in input */
1224   if (input->collection)
1225     gst_object_unref (input->collection);
1226   input->collection = gst_object_ref (collection);
1227   GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1228       input);
1229
1230   /* Merge collection if needed */
1231   collection = get_merged_collection (dbin);
1232
1233 #ifndef GST_DISABLE_GST_DEBUG
1234   /* Just some debugging */
1235   upstream_id = gst_stream_collection_get_upstream_id (collection);
1236   GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1237   GST_DEBUG ("From input %p", input);
1238   GST_DEBUG ("  %d streams", gst_stream_collection_get_size (collection));
1239   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1240     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1241     GstTagList *taglist;
1242     GstCaps *caps;
1243
1244     GST_DEBUG ("   Stream '%s'", gst_stream_get_stream_id (stream));
1245     GST_DEBUG ("     type  : %s",
1246         gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1247     GST_DEBUG ("     flags : 0x%x", gst_stream_get_stream_flags (stream));
1248     taglist = gst_stream_get_tags (stream);
1249     GST_DEBUG ("     tags  : %" GST_PTR_FORMAT, taglist);
1250     caps = gst_stream_get_caps (stream);
1251     GST_DEBUG ("     caps  : %" GST_PTR_FORMAT, caps);
1252     if (taglist)
1253       gst_tag_list_unref (taglist);
1254     if (caps)
1255       gst_caps_unref (caps);
1256   }
1257 #endif
1258
1259   /* Store collection for later usage */
1260   if (dbin->collection == NULL) {
1261     dbin->collection = collection;
1262   } else {
1263     /* We need to check who emitted this collection (the owner).
1264      * If we already had a collection from that user, this one is an update,
1265      * that is to say that we need to figure out how we are going to re-use
1266      * the streams/slot */
1267     GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1268     /* FIXME : When do we switch from pending collection to active collection ?
1269      * When all streams from active collection are drained in multiqueue output ? */
1270     gst_object_unref (dbin->collection);
1271     dbin->collection = collection;
1272     /* dbin->pending_collection = */
1273     /*     g_list_append (dbin->pending_collection, collection); */
1274   }
1275 }
1276
1277 static void
1278 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1279 {
1280   GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1281   gboolean posting_collection = FALSE;
1282
1283   GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1284
1285   switch (GST_MESSAGE_TYPE (message)) {
1286     case GST_MESSAGE_STREAM_COLLECTION:
1287     {
1288       GstStreamCollection *collection = NULL;
1289       gst_message_parse_stream_collection (message, &collection);
1290       if (collection) {
1291         INPUT_LOCK (dbin);
1292         handle_stream_collection (dbin, collection,
1293             (GstElement *) GST_MESSAGE_SRC (message));
1294         posting_collection = TRUE;
1295         INPUT_UNLOCK (dbin);
1296       }
1297       if (dbin->collection && collection != dbin->collection) {
1298         /* Replace collection message, we most likely aggregated it */
1299         GstMessage *new_msg;
1300         new_msg =
1301             gst_message_new_stream_collection ((GstObject *) dbin,
1302             dbin->collection);
1303         gst_message_unref (message);
1304         message = new_msg;
1305       }
1306       if (collection)
1307         gst_object_unref (collection);
1308       break;
1309     }
1310     default:
1311       break;
1312   }
1313
1314   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1315
1316   if (posting_collection) {
1317     /* Figure out a selection for that collection */
1318     update_requested_selection (dbin, dbin->collection);
1319   }
1320 }
1321
1322 static DecodebinOutputStream *
1323 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1324 {
1325   GList *tmp;
1326   GstStreamType stype = gst_stream_get_stream_type (stream);
1327
1328   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1329     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1330     if (output->type == stype && output->slot && output->slot->active_stream) {
1331       GstStream *tstream = output->slot->active_stream;
1332       if (!stream_in_list (dbin->requested_selection,
1333               (gchar *) gst_stream_get_stream_id (tstream))) {
1334         return output;
1335       }
1336     }
1337   }
1338
1339   return NULL;
1340 }
1341
1342 /* Give a certain slot, figure out if it should be linked to an
1343  * output stream
1344  * CALL WITH SELECTION LOCK TAKEN !*/
1345 static DecodebinOutputStream *
1346 get_output_for_slot (MultiQueueSlot * slot)
1347 {
1348   GstDecodebin3 *dbin = slot->dbin;
1349   DecodebinOutputStream *output = NULL;
1350   const gchar *stream_id;
1351   GstCaps *caps;
1352   gchar *id_in_list = NULL;
1353
1354   /* If we already have a configured output, just use it */
1355   if (slot->output != NULL)
1356     return slot->output;
1357
1358   /*
1359    * FIXME
1360    *
1361    * This method needs to be split into multiple parts
1362    *
1363    * 1) Figure out whether stream should be exposed or not
1364    *   This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1365    *   in the default stream attribution
1366    *
1367    * 2) Figure out whether an output stream should be created, whether
1368    *   we can re-use the output stream already linked to the slot, or
1369    *   whether we need to get re-assigned another (currently used) output
1370    *   stream.
1371    */
1372
1373   stream_id = gst_stream_get_stream_id (slot->active_stream);
1374   caps = gst_stream_get_caps (slot->active_stream);
1375   GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1376   gst_caps_unref (caps);
1377
1378   /* 0. Emit autoplug-continue signal for pending caps ? */
1379   GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1380
1381   /* 1. if in EXPOSE_ALL_MODE, just accept */
1382   GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1383
1384 #if 0
1385   /* FIXME : The idea around this was to avoid activating a stream for
1386    *     which we have no decoder. Unfortunately it is way too
1387    *     expensive. Need to figure out a better solution */
1388   /* 2. Is there a potential decoder (if one is required) */
1389   if (!gst_caps_can_intersect (caps, dbin->caps)
1390       && !have_factory (dbin, (GstCaps *) caps,
1391           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1392     GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1393         caps);
1394     SELECTION_UNLOCK (dbin);
1395     gst_element_post_message (GST_ELEMENT_CAST (dbin),
1396         gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1397     SELECTION_LOCK (dbin);
1398     return NULL;
1399   }
1400 #endif
1401
1402   /* 3. In default mode check if we should expose */
1403   id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
1404   if (id_in_list) {
1405     /* Check if we can steal an existing output stream we could re-use.
1406      * that is:
1407      * * an output stream whose slot->stream is not in requested
1408      * * and is of the same type as this stream
1409      */
1410     output = find_free_compatible_output (dbin, slot->active_stream);
1411     if (output) {
1412       /* Move this output from its current slot to this slot */
1413       dbin->to_activate =
1414           g_list_append (dbin->to_activate, (gchar *) stream_id);
1415       dbin->requested_selection =
1416           g_list_remove (dbin->requested_selection, id_in_list);
1417       g_free (id_in_list);
1418       SELECTION_UNLOCK (dbin);
1419       gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1420           (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1421       SELECTION_LOCK (dbin);
1422       return NULL;
1423     }
1424
1425     output = create_output_stream (dbin, slot->type);
1426     output->slot = slot;
1427     GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1428     slot->output = output;
1429     dbin->active_selection =
1430         g_list_append (dbin->active_selection, (gchar *) stream_id);
1431   } else
1432     GST_DEBUG ("Not creating any output for slot %p", slot);
1433
1434   return output;
1435 }
1436
1437 /* Returns SELECTED_STREAMS message if active_selection is equal to
1438  * requested_selection, else NULL.
1439  * Must be called with LOCK taken */
1440 static GstMessage *
1441 is_selection_done (GstDecodebin3 * dbin)
1442 {
1443   GList *tmp;
1444   GstMessage *msg;
1445
1446   if (!dbin->selection_updated)
1447     return NULL;
1448
1449   GST_LOG_OBJECT (dbin, "Checking");
1450
1451   if (dbin->to_activate != NULL) {
1452     GST_DEBUG ("Still have streams to activate");
1453     return NULL;
1454   }
1455   for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1456     GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1457     if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1458       GST_DEBUG ("Not in active selection, returning");
1459       return NULL;
1460     }
1461   }
1462
1463   GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1464
1465   /* We are completely active */
1466   msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1467   GST_MESSAGE_SEQNUM (msg) = dbin->select_streams_seqnum;
1468   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1469     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1470     if (output->slot) {
1471       GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1472           gst_stream_get_stream_id (output->slot->active_stream));
1473
1474       gst_message_streams_selected_add (msg, output->slot->active_stream);
1475     } else
1476       GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1477   }
1478   dbin->selection_updated = FALSE;
1479   return msg;
1480 }
1481
1482 /* Must be called with SELECTION_LOCK taken */
1483 static void
1484 check_all_slot_for_eos (GstDecodebin3 * dbin)
1485 {
1486   gboolean all_drained = TRUE;
1487   GList *iter;
1488
1489   GST_DEBUG_OBJECT (dbin, "check slot for eos");
1490
1491   for (iter = dbin->slots; iter; iter = iter->next) {
1492     MultiQueueSlot *slot = iter->data;
1493
1494     if (!slot->output)
1495       continue;
1496
1497     if (slot->is_drained) {
1498       GST_DEBUG_OBJECT (slot->sink_pad, "slot %p is draned", slot);
1499       continue;
1500     }
1501
1502     all_drained = FALSE;
1503     break;
1504   }
1505
1506   if (all_drained) {
1507     INPUT_LOCK (dbin);
1508     if (!pending_pads_are_eos (dbin->main_input))
1509       all_drained = FALSE;
1510
1511     if (all_drained) {
1512       for (iter = dbin->other_inputs; iter; iter = iter->next) {
1513         if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
1514           all_drained = FALSE;
1515           break;
1516         }
1517       }
1518     }
1519     INPUT_UNLOCK (dbin);
1520   }
1521
1522   if (all_drained) {
1523     GST_DEBUG_OBJECT (dbin,
1524         "All active slots are drained, and no pending input, push EOS");
1525
1526     for (iter = dbin->input_streams; iter; iter = iter->next) {
1527       DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
1528       GstPad *peer = gst_pad_get_peer (input->srcpad);
1529
1530       /* Send EOS and then remove elements */
1531       if (peer) {
1532         gst_pad_send_event (peer, gst_event_new_eos ());
1533         gst_object_unref (peer);
1534       }
1535     }
1536   }
1537 }
1538
1539 static GstPadProbeReturn
1540 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1541     MultiQueueSlot * slot)
1542 {
1543   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1544   GstDecodebin3 *dbin = slot->dbin;
1545
1546   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1547     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1548
1549     GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1550     switch (GST_EVENT_TYPE (ev)) {
1551       case GST_EVENT_STREAM_START:
1552       {
1553         GstStream *stream = NULL;
1554         const gchar *stream_id;
1555
1556         gst_event_parse_stream (ev, &stream);
1557         if (stream == NULL) {
1558           GST_ERROR_OBJECT (pad,
1559               "Got a STREAM_START event without a GstStream");
1560           break;
1561         }
1562         slot->is_drained = FALSE;
1563         stream_id = gst_stream_get_stream_id (stream);
1564         GST_DEBUG_OBJECT (pad, "Stream Start '%s'", stream_id);
1565         if (slot->active_stream == NULL) {
1566           slot->active_stream = stream;
1567         } else if (slot->active_stream != stream) {
1568           GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1569               gst_stream_get_stream_id (slot->active_stream),
1570               gst_stream_get_stream_id (stream));
1571           gst_object_unref (slot->active_stream);
1572           slot->active_stream = stream;
1573         } else
1574           gst_object_unref (stream);
1575 #if 0                           /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1576         {
1577           gboolean is_active, is_requested;
1578           /* Quick check to see if we're in the current selection */
1579           /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
1580           SELECTION_LOCK (dbin);
1581           GST_DEBUG_OBJECT (dbin, "Checking active selection");
1582           is_active = stream_in_list (dbin->active_selection, stream_id);
1583           GST_DEBUG_OBJECT (dbin, "Checking requested selection");
1584           is_requested = stream_in_list (dbin->requested_selection, stream_id);
1585           SELECTION_UNLOCK (dbin);
1586           if (is_active)
1587             GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
1588                 slot->output);
1589           if (is_requested)
1590             GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
1591                 slot->output);
1592           else if (slot->output) {
1593             GST_DEBUG_OBJECT (pad,
1594                 "Slot needs to be deactivated ? It's no longer in requested selection");
1595           } else if (!is_active)
1596             GST_DEBUG_OBJECT (pad,
1597                 "Slot in neither active nor requested selection");
1598         }
1599 #endif
1600       }
1601         break;
1602       case GST_EVENT_CAPS:
1603       {
1604         /* Configure the output slot if needed */
1605         DecodebinOutputStream *output;
1606         GstMessage *msg = NULL;
1607         SELECTION_LOCK (dbin);
1608         output = get_output_for_slot (slot);
1609         if (output) {
1610           reconfigure_output_stream (output, slot);
1611           msg = is_selection_done (dbin);
1612         }
1613         SELECTION_UNLOCK (dbin);
1614         if (msg)
1615           gst_element_post_message ((GstElement *) slot->dbin, msg);
1616       }
1617         break;
1618       case GST_EVENT_EOS:
1619         /* FIXME : Figure out */
1620         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
1621             slot->input);
1622         slot->is_drained = TRUE;
1623         if (slot->input == NULL) {
1624           GstPad *peer;
1625           GST_DEBUG_OBJECT (pad,
1626               "last EOS for input, forwarding and removing slot");
1627           peer = gst_pad_get_peer (pad);
1628           if (peer) {
1629             gst_pad_send_event (peer, ev);
1630             gst_object_unref (peer);
1631           } else {
1632             gst_event_unref (ev);
1633           }
1634           SELECTION_LOCK (dbin);
1635           /* FIXME : Shouldn't we try to re-assign the output instead of just
1636            * removing it ? */
1637           /* Remove the output */
1638           if (slot->output) {
1639             DecodebinOutputStream *output = slot->output;
1640             dbin->output_streams = g_list_remove (dbin->output_streams, output);
1641             free_output_stream (dbin, output);
1642           }
1643           slot->probe_id = 0;
1644           dbin->slots = g_list_remove (dbin->slots, slot);
1645           free_multiqueue_slot_async (dbin, slot);
1646           SELECTION_UNLOCK (dbin);
1647           ret = GST_PAD_PROBE_REMOVE;
1648         }
1649         break;
1650       case GST_EVENT_CUSTOM_DOWNSTREAM:
1651         if (gst_event_has_name (ev, "decodebin3-custom-eos")) {
1652           slot->is_drained = TRUE;
1653           ret = GST_PAD_PROBE_DROP;
1654           SELECTION_LOCK (dbin);
1655           if (slot->input == NULL) {
1656             GST_DEBUG_OBJECT (pad,
1657                 "Got custom-eos from null input stream, remove output stream");
1658             /* Remove the output */
1659             if (slot->output) {
1660               DecodebinOutputStream *output = slot->output;
1661               dbin->output_streams =
1662                   g_list_remove (dbin->output_streams, output);
1663               free_output_stream (dbin, output);
1664             }
1665             slot->probe_id = 0;
1666             dbin->slots = g_list_remove (dbin->slots, slot);
1667             free_multiqueue_slot_async (dbin, slot);
1668             ret = GST_PAD_PROBE_REMOVE;
1669           } else {
1670             check_all_slot_for_eos (dbin);
1671           }
1672           SELECTION_UNLOCK (dbin);
1673         }
1674         break;
1675       default:
1676         break;
1677     }
1678   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
1679     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
1680     switch (GST_QUERY_TYPE (query)) {
1681       case GST_QUERY_CAPS:
1682       {
1683         GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
1684         gst_query_set_caps_result (query, GST_CAPS_ANY);
1685         ret = GST_PAD_PROBE_HANDLED;
1686       }
1687         break;
1688
1689       case GST_QUERY_ACCEPT_CAPS:
1690       {
1691         GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
1692         /* If the current decoder doesn't accept caps, we'll reconfigure
1693          * on the actual caps event. So accept any caps. */
1694         gst_query_set_accept_caps_result (query, TRUE);
1695         ret = GST_PAD_PROBE_HANDLED;
1696       }
1697       default:
1698         break;
1699     }
1700   }
1701
1702   return ret;
1703 }
1704
1705 /* Create a new multiqueue slot for the given type
1706  *
1707  * It is up to the caller to know whether that slot is needed or not
1708  * (and release it when no longer needed) */
1709 static MultiQueueSlot *
1710 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
1711 {
1712   MultiQueueSlot *slot;
1713   GstIterator *it = NULL;
1714   GValue item = { 0, };
1715
1716   GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
1717       gst_stream_type_get_name (type));
1718   slot = g_new0 (MultiQueueSlot, 1);
1719   slot->dbin = dbin;
1720   slot->id = dbin->slot_id++;
1721   slot->type = type;
1722   slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
1723   if (slot->sink_pad == NULL)
1724     goto fail;
1725   it = gst_pad_iterate_internal_links (slot->sink_pad);
1726   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
1727       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
1728     GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
1729         GST_DEBUG_PAD_NAME (slot->src_pad));
1730     goto fail;
1731   }
1732   gst_iterator_free (it);
1733   g_value_reset (&item);
1734
1735   g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
1736
1737   /* Add event probe */
1738   slot->probe_id =
1739       gst_pad_add_probe (slot->src_pad,
1740       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
1741       (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
1742
1743   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
1744       GST_DEBUG_PAD_NAME (slot->src_pad));
1745   dbin->slots = g_list_append (dbin->slots, slot);
1746   return slot;
1747
1748   /* ERRORS */
1749 fail:
1750   {
1751     if (slot->sink_pad)
1752       gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
1753     g_free (slot);
1754     return NULL;
1755   }
1756 }
1757
1758 /* Must be called with SELECTION_LOCK */
1759 static MultiQueueSlot *
1760 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
1761 {
1762   GList *tmp;
1763   MultiQueueSlot *empty_slot = NULL;
1764   GstStreamType input_type = 0;
1765   gchar *stream_id = NULL;
1766
1767   GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
1768       input, input->active_stream,
1769       input->
1770       active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
1771
1772   if (input->active_stream) {
1773     input_type = gst_stream_get_stream_type (input->active_stream);
1774     stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
1775   }
1776
1777   /* Go over existing slots and check if there is already one for it */
1778   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1779     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1780     /* Already used input, return that one */
1781     if (slot->input == input) {
1782       GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
1783       return slot;
1784     }
1785   }
1786
1787   /* Go amongst all unused slots of the right type and try to find a candidate */
1788   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1789     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1790     if (slot->input == NULL && input_type == slot->type) {
1791       /* Remember this empty slot for later */
1792       empty_slot = slot;
1793       /* Check if available slot is of the same stream_id */
1794       GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
1795           slot->id, slot->active_stream);
1796       if (stream_id && slot->active_stream) {
1797         gchar *ostream_id =
1798             (gchar *) gst_stream_get_stream_id (slot->active_stream);
1799         GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
1800             ostream_id, stream_id);
1801         if (!g_strcmp0 (stream_id, ostream_id))
1802           break;
1803       }
1804     }
1805   }
1806
1807   if (empty_slot) {
1808     GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
1809     empty_slot->input = input;
1810     return empty_slot;
1811   }
1812
1813   if (input_type)
1814     return create_new_slot (dbin, input_type);
1815
1816   return NULL;
1817 }
1818
1819 static void
1820 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
1821 {
1822   if (slot->input != NULL && slot->input != input) {
1823     GST_ERROR_OBJECT (slot->dbin,
1824         "Trying to link input to an already used slot");
1825     return;
1826   }
1827   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
1828   slot->pending_stream = input->active_stream;
1829   slot->input = input;
1830 }
1831
1832 #if 0
1833 static gboolean
1834 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
1835     GstElementFactoryListType ftype)
1836 {
1837   gboolean ret = FALSE;
1838   GList *res;
1839
1840   g_mutex_lock (&dbin->factories_lock);
1841   gst_decode_bin_update_factories_list (dbin);
1842   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1843     res =
1844         gst_element_factory_list_filter (dbin->decoder_factories,
1845         caps, GST_PAD_SINK, TRUE);
1846   else
1847     res =
1848         gst_element_factory_list_filter (dbin->decodable_factories,
1849         caps, GST_PAD_SINK, TRUE);
1850   g_mutex_unlock (&dbin->factories_lock);
1851
1852   if (res) {
1853     ret = TRUE;
1854     gst_plugin_feature_list_free (res);
1855   }
1856
1857   return ret;
1858 }
1859 #endif
1860
1861 static GstElement *
1862 create_element (GstDecodebin3 * dbin, GstStream * stream,
1863     GstElementFactoryListType ftype)
1864 {
1865   GList *res;
1866   GstElement *element = NULL;
1867   GstCaps *caps;
1868
1869   g_mutex_lock (&dbin->factories_lock);
1870   gst_decode_bin_update_factories_list (dbin);
1871   caps = gst_stream_get_caps (stream);
1872   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
1873     res =
1874         gst_element_factory_list_filter (dbin->decoder_factories,
1875         caps, GST_PAD_SINK, TRUE);
1876   else
1877     res =
1878         gst_element_factory_list_filter (dbin->decodable_factories,
1879         caps, GST_PAD_SINK, TRUE);
1880   g_mutex_unlock (&dbin->factories_lock);
1881
1882   if (res) {
1883     element =
1884         gst_element_factory_create ((GstElementFactory *) res->data, NULL);
1885     GST_DEBUG ("Created element '%s'", GST_ELEMENT_NAME (element));
1886     gst_plugin_feature_list_free (res);
1887   } else {
1888     GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT, caps);
1889   }
1890
1891   gst_caps_unref (caps);
1892   return element;
1893 }
1894
1895 /* FIXME : VERY NAIVE. ASSUMING FIRST ONE WILL WORK */
1896 static GstElement *
1897 create_decoder (GstDecodebin3 * dbin, GstStream * stream)
1898 {
1899   return create_element (dbin, stream, GST_ELEMENT_FACTORY_TYPE_DECODER);
1900 }
1901
1902 static GstPadProbeReturn
1903 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
1904     DecodebinOutputStream * output)
1905 {
1906   GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
1907   /* If we have a keyframe, remove the probe and let all data through */
1908   /* FIXME : HANDLE HEADER BUFFER ?? */
1909   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1910       GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
1911     GST_DEBUG_OBJECT (pad,
1912         "Buffer is keyframe or header, letting through and removing probe");
1913     output->drop_probe_id = 0;
1914     return GST_PAD_PROBE_REMOVE;
1915   }
1916   GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
1917   return GST_PAD_PROBE_DROP;
1918 }
1919
1920 static void
1921 reconfigure_output_stream (DecodebinOutputStream * output,
1922     MultiQueueSlot * slot)
1923 {
1924   GstDecodebin3 *dbin = output->dbin;
1925   GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
1926   gboolean needs_decoder;
1927
1928   needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
1929
1930   GST_DEBUG_OBJECT (dbin,
1931       "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
1932       needs_decoder);
1933
1934   /* FIXME : Maybe make the output un-hook itself automatically ? */
1935   if (output->slot != NULL && output->slot != slot) {
1936     GST_WARNING_OBJECT (dbin,
1937         "Output still linked to another slot (%p)", output->slot);
1938     gst_caps_unref (new_caps);
1939     return;
1940   }
1941
1942   /* Check if existing config is reusable as-is by checking if
1943    * the existing decoder accepts the new caps, if not delete
1944    * it and create a new one */
1945   if (output->decoder) {
1946     gboolean can_reuse_decoder;
1947
1948     if (needs_decoder) {
1949       can_reuse_decoder =
1950           gst_pad_query_accept_caps (output->decoder_sink, new_caps);
1951     } else
1952       can_reuse_decoder = FALSE;
1953
1954     if (can_reuse_decoder) {
1955       if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
1956         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
1957         output->drop_probe_id =
1958             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
1959             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
1960       }
1961       GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
1962       if (output->linked == FALSE) {
1963         gst_pad_link_full (slot->src_pad, output->decoder_sink,
1964             GST_PAD_LINK_CHECK_NOTHING);
1965         output->linked = TRUE;
1966       }
1967       gst_caps_unref (new_caps);
1968       return;
1969     }
1970
1971     GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
1972
1973     if (output->linked)
1974       gst_pad_unlink (slot->src_pad, output->decoder_sink);
1975     output->linked = FALSE;
1976     if (output->drop_probe_id) {
1977       gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
1978       output->drop_probe_id = 0;
1979     }
1980
1981     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
1982       GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
1983       gst_caps_unref (new_caps);
1984       goto cleanup;
1985     }
1986
1987     gst_element_set_locked_state (output->decoder, TRUE);
1988     gst_element_set_state (output->decoder, GST_STATE_NULL);
1989
1990     gst_bin_remove ((GstBin *) dbin, output->decoder);
1991     output->decoder = NULL;
1992   }
1993
1994   gst_caps_unref (new_caps);
1995
1996   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
1997   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
1998
1999   /* If a decoder is required, create one */
2000   if (needs_decoder) {
2001     /* If we don't have a decoder yet, instantiate one */
2002     output->decoder = create_decoder (dbin, slot->active_stream);
2003     if (output->decoder == NULL) {
2004       GstCaps *caps;
2005
2006       SELECTION_UNLOCK (dbin);
2007       /* FIXME : Should we be smarter if there's a missing decoder ?
2008        * Should we deactivate that stream ? */
2009       caps = gst_stream_get_caps (slot->active_stream);
2010       gst_element_post_message (GST_ELEMENT_CAST (dbin),
2011           gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2012       gst_caps_unref (caps);
2013       SELECTION_LOCK (dbin);
2014       goto cleanup;
2015     }
2016     if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2017       GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2018       goto cleanup;
2019     }
2020     output->decoder_sink = gst_element_get_static_pad (output->decoder, "sink");
2021     output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2022     if (output->type & GST_STREAM_TYPE_VIDEO) {
2023       GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2024       output->drop_probe_id =
2025           gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2026           (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2027     }
2028     if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2029             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2030       GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2031           GST_DEBUG_PAD_NAME (output->decoder_sink));
2032       goto cleanup;
2033     }
2034   } else {
2035     output->decoder_src = gst_object_ref (slot->src_pad);
2036     output->decoder_sink = NULL;
2037   }
2038   output->linked = TRUE;
2039   if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2040           output->decoder_src)) {
2041     GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2042     goto cleanup;
2043   }
2044   if (output->src_exposed == FALSE) {
2045     output->src_exposed = TRUE;
2046     gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2047   }
2048
2049   if (output->decoder)
2050     gst_element_sync_state_with_parent (output->decoder);
2051
2052   output->slot = slot;
2053   return;
2054
2055 cleanup:
2056   {
2057     GST_DEBUG_OBJECT (dbin, "Cleanup");
2058     if (output->decoder_sink) {
2059       gst_object_unref (output->decoder_sink);
2060       output->decoder_sink = NULL;
2061     }
2062     if (output->decoder_src) {
2063       gst_object_unref (output->decoder_src);
2064       output->decoder_src = NULL;
2065     }
2066     if (output->decoder) {
2067       gst_element_set_state (output->decoder, GST_STATE_NULL);
2068       gst_bin_remove ((GstBin *) dbin, output->decoder);
2069       output->decoder = NULL;
2070     }
2071   }
2072 }
2073
2074 static GstPadProbeReturn
2075 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2076 {
2077   GstMessage *msg = NULL;
2078   DecodebinOutputStream *output;
2079
2080   SELECTION_LOCK (slot->dbin);
2081   output = get_output_for_slot (slot);
2082
2083   GST_DEBUG_OBJECT (pad, "output : %p", output);
2084
2085   if (output) {
2086     reconfigure_output_stream (output, slot);
2087     msg = is_selection_done (slot->dbin);
2088   }
2089   SELECTION_UNLOCK (slot->dbin);
2090   if (msg)
2091     gst_element_post_message ((GstElement *) slot->dbin, msg);
2092
2093   return GST_PAD_PROBE_REMOVE;
2094 }
2095
2096 static MultiQueueSlot *
2097 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2098 {
2099   GList *tmp;
2100
2101   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2102     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2103     const gchar *stream_id;
2104     if (slot->active_stream) {
2105       stream_id = gst_stream_get_stream_id (slot->active_stream);
2106       if (!g_strcmp0 (sid, stream_id))
2107         return slot;
2108     }
2109     if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2110       stream_id = gst_stream_get_stream_id (slot->pending_stream);
2111       if (!g_strcmp0 (sid, stream_id))
2112         return slot;
2113     }
2114   }
2115
2116   return NULL;
2117 }
2118
2119 /* This function handles the reassignment of a slot. Call this from
2120  * the streaming thread of a slot. */
2121 static gboolean
2122 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2123 {
2124   DecodebinOutputStream *output;
2125   MultiQueueSlot *target_slot = NULL;
2126   GList *tmp;
2127   const gchar *sid, *tsid;
2128
2129   SELECTION_LOCK (dbin);
2130   output = slot->output;
2131
2132   if (G_UNLIKELY (slot->active_stream == NULL)) {
2133     GST_DEBUG_OBJECT (slot->src_pad,
2134         "Called on inactive slot (active_stream == NULL)");
2135     SELECTION_UNLOCK (dbin);
2136     return FALSE;
2137   }
2138
2139   if (G_UNLIKELY (output == NULL)) {
2140     GST_DEBUG_OBJECT (slot->src_pad,
2141         "Slot doesn't have any output to be removed");
2142     SELECTION_UNLOCK (dbin);
2143     return FALSE;
2144   }
2145
2146   sid = gst_stream_get_stream_id (slot->active_stream);
2147   GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2148
2149   /* Recheck whether this stream is still in the list of streams to deactivate */
2150   if (stream_in_list (dbin->requested_selection, sid)) {
2151     /* Stream is in the list of requested streams, don't remove */
2152     SELECTION_UNLOCK (dbin);
2153     GST_DEBUG_OBJECT (slot->src_pad,
2154         "Stream '%s' doesn't need to be deactivated", sid);
2155     return FALSE;
2156   }
2157
2158   /* Unlink slot from output */
2159   /* FIXME : Handle flushing ? */
2160   /* FIXME : Handle outputs without decoders */
2161   GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2162       output->decoder_sink);
2163   if (output->decoder_sink)
2164     gst_pad_unlink (slot->src_pad, output->decoder_sink);
2165   output->linked = FALSE;
2166   slot->output = NULL;
2167   output->slot = NULL;
2168   /* Remove sid from active selection */
2169   for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2170     if (!g_strcmp0 (sid, tmp->data)) {
2171       dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2172       break;
2173     }
2174
2175   /* Can we re-assign this output to a requested stream ? */
2176   GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2177   for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2178     MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2179     GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2180         tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2181     if (tslot && tslot->type == output->type && tslot->output == NULL) {
2182       GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2183       target_slot = tslot;
2184       tsid = tmp->data;
2185       /* Pass target stream id to requested selection */
2186       dbin->requested_selection =
2187           g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2188       dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data);
2189       break;
2190     }
2191   }
2192
2193   if (target_slot) {
2194     GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2195         target_slot, tsid);
2196     target_slot->output = output;
2197     output->slot = target_slot;
2198     dbin->active_selection =
2199         g_list_append (dbin->active_selection, (gchar *) tsid);
2200     SELECTION_UNLOCK (dbin);
2201
2202     /* Wakeup the target slot so that it retries to send events/buffers
2203      * thereby triggering the output reconfiguration codepath */
2204     gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2205         (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2206     /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2207   } else {
2208     GstMessage *msg;
2209
2210     dbin->output_streams = g_list_remove (dbin->output_streams, output);
2211     free_output_stream (dbin, output);
2212     msg = is_selection_done (slot->dbin);
2213     SELECTION_UNLOCK (dbin);
2214
2215     if (msg)
2216       gst_element_post_message ((GstElement *) slot->dbin, msg);
2217   }
2218
2219   return TRUE;
2220 }
2221
2222 /* Idle probe called when a slot should be unassigned from its output stream.
2223  * This is needed to ensure nothing is flowing when unlinking the slot.
2224  *
2225  * Also, this method will search for a pending stream which could re-use
2226  * the output stream. */
2227 static GstPadProbeReturn
2228 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2229     MultiQueueSlot * slot)
2230 {
2231   GstDecodebin3 *dbin = slot->dbin;
2232
2233   reassign_slot (dbin, slot);
2234
2235   return GST_PAD_PROBE_REMOVE;
2236 }
2237
2238 static gboolean
2239 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2240     guint32 seqnum)
2241 {
2242   gboolean ret = TRUE;
2243   GList *tmp;
2244   /* List of slots to (de)activate. */
2245   GList *to_deactivate = NULL;
2246   GList *to_activate = NULL;
2247   /* List of unknown stream id, most likely means the event
2248    * should be sent upstream so that elements can expose the requested stream */
2249   GList *unknown = NULL;
2250   GList *to_reassign = NULL;
2251   GList *future_request_streams = NULL;
2252   GList *pending_streams = NULL;
2253   GList *slots_to_reassign = NULL;
2254
2255   SELECTION_LOCK (dbin);
2256   if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2257     GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2258     SELECTION_UNLOCK (dbin);
2259     return TRUE;
2260   }
2261   /* Remove pending select_streams */
2262   g_list_free (dbin->pending_select_streams);
2263   dbin->pending_select_streams = NULL;
2264
2265   /* COMPARE the requested streams to the active and requested streams
2266    * on multiqueue. */
2267
2268   /* First check the slots to activate and which ones are unknown */
2269   for (tmp = select_streams; tmp; tmp = tmp->next) {
2270     const gchar *sid = (const gchar *) tmp->data;
2271     MultiQueueSlot *slot;
2272     GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2273     slot = find_slot_for_stream_id (dbin, sid);
2274     /* Find the corresponding slot */
2275     if (slot == NULL) {
2276       if (stream_in_collection (dbin, (gchar *) sid)) {
2277         pending_streams = g_list_append (pending_streams, (gchar *) sid);
2278       } else {
2279         GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2280         unknown = g_list_append (unknown, (gchar *) sid);
2281       }
2282     } else if (slot->output == NULL) {
2283       GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2284           slot, sid);
2285       to_activate = g_list_append (to_activate, slot);
2286     } else {
2287       GST_DEBUG_OBJECT (dbin,
2288           "Stream '%s' from slot %p is already active on output %p", sid, slot,
2289           slot->output);
2290       future_request_streams =
2291           g_list_append (future_request_streams, (gchar *) sid);
2292     }
2293   }
2294
2295   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2296     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2297     /* For slots that have an output, check if it's part of the streams to
2298      * be active */
2299     if (slot->output) {
2300       gboolean slot_to_deactivate = TRUE;
2301
2302       if (slot->active_stream) {
2303         if (stream_in_list (select_streams,
2304                 gst_stream_get_stream_id (slot->active_stream)))
2305           slot_to_deactivate = FALSE;
2306       }
2307       if (slot_to_deactivate && slot->pending_stream
2308           && slot->pending_stream != slot->active_stream) {
2309         if (stream_in_list (select_streams,
2310                 gst_stream_get_stream_id (slot->pending_stream)))
2311           slot_to_deactivate = FALSE;
2312       }
2313       if (slot_to_deactivate) {
2314         GST_DEBUG_OBJECT (dbin,
2315             "Slot %p (%s) should be deactivated, no longer used", slot,
2316             slot->
2317             active_stream ? gst_stream_get_stream_id (slot->active_stream) :
2318             "NULL");
2319         to_deactivate = g_list_append (to_deactivate, slot);
2320       }
2321     }
2322   }
2323
2324   if (to_deactivate != NULL) {
2325     GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2326     /* We need to compare what needs to be activated and deactivated in order
2327      * to determine whether there are outputs that can be transferred */
2328     /* Take the stream-id of the slots that are to be activated, for which there
2329      * is a slot of the same type that needs to be deactivated */
2330     tmp = to_deactivate;
2331     while (tmp) {
2332       MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2333       gboolean removeit = FALSE;
2334       GList *tmp2, *next;
2335       GST_DEBUG_OBJECT (dbin,
2336           "Checking if slot to deactivate (%p) has a candidate slot to activate",
2337           slot_to_deactivate);
2338       for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2339         MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2340         GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2341         if (slot_to_activate->type == slot_to_deactivate->type) {
2342           GST_DEBUG_OBJECT (dbin, "Re-using");
2343           to_reassign = g_list_append (to_reassign, (gchar *)
2344               gst_stream_get_stream_id (slot_to_activate->active_stream));
2345           slots_to_reassign =
2346               g_list_append (slots_to_reassign, slot_to_deactivate);
2347           to_activate = g_list_remove (to_activate, slot_to_activate);
2348           removeit = TRUE;
2349           break;
2350         }
2351       }
2352       next = tmp->next;
2353       if (removeit)
2354         to_deactivate = g_list_delete_link (to_deactivate, tmp);
2355       tmp = next;
2356     }
2357   }
2358
2359   for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2360     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2361     GST_DEBUG_OBJECT (dbin,
2362         "Really need to deactivate slot %p, but no available alternative",
2363         slot);
2364
2365     slots_to_reassign = g_list_append (slots_to_reassign, slot);
2366   }
2367
2368   /* The only slots left to activate are the ones that won't be reassigned and
2369    * therefore really need to have a new output created */
2370   for (tmp = to_activate; tmp; tmp = tmp->next) {
2371     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2372     if (slot->active_stream)
2373       future_request_streams =
2374           g_list_append (future_request_streams,
2375           (gchar *) gst_stream_get_stream_id (slot->active_stream));
2376     else if (slot->pending_stream)
2377       future_request_streams =
2378           g_list_append (future_request_streams,
2379           (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2380     else
2381       GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2382   }
2383
2384   if (to_activate == NULL && pending_streams != NULL) {
2385     GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2386     if (dbin->requested_selection)
2387       g_list_free_full (dbin->requested_selection, g_free);
2388     dbin->requested_selection =
2389         g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
2390     g_list_free (to_deactivate);
2391     g_list_free (pending_streams);
2392     to_deactivate = NULL;
2393     pending_streams = NULL;
2394   } else {
2395     if (dbin->requested_selection)
2396       g_list_free_full (dbin->requested_selection, g_free);
2397     dbin->requested_selection =
2398         g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
2399     dbin->requested_selection =
2400         g_list_concat (dbin->requested_selection,
2401         g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
2402     if (dbin->to_activate)
2403       g_list_free (dbin->to_activate);
2404     dbin->to_activate = g_list_copy (to_reassign);
2405   }
2406
2407   dbin->selection_updated = TRUE;
2408   SELECTION_UNLOCK (dbin);
2409
2410   if (unknown) {
2411     GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2412     g_list_free (unknown);
2413   }
2414
2415   /* For all streams to deactivate, add an idle probe where we will do
2416    * the unassignment and switch over */
2417   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2418     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2419     gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2420         (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2421   }
2422
2423   if (to_deactivate)
2424     g_list_free (to_deactivate);
2425   if (to_activate)
2426     g_list_free (to_activate);
2427   if (to_reassign)
2428     g_list_free (to_reassign);
2429   if (future_request_streams)
2430     g_list_free (future_request_streams);
2431   if (pending_streams)
2432     g_list_free (pending_streams);
2433   if (slots_to_reassign)
2434     g_list_free (slots_to_reassign);
2435
2436   return ret;
2437 }
2438
2439 static GstPadProbeReturn
2440 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2441     DecodebinOutputStream * output)
2442 {
2443   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2444   GstDecodebin3 *dbin = output->dbin;
2445   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2446
2447   GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2448
2449   switch (GST_EVENT_TYPE (event)) {
2450     case GST_EVENT_SELECT_STREAMS:
2451     {
2452       GstPad *peer;
2453       GList *streams = NULL;
2454       guint32 seqnum = gst_event_get_seqnum (event);
2455
2456       SELECTION_LOCK (dbin);
2457       if (seqnum == dbin->select_streams_seqnum) {
2458         SELECTION_UNLOCK (dbin);
2459         GST_DEBUG_OBJECT (pad,
2460             "Already handled/handling that SELECT_STREAMS event");
2461         break;
2462       }
2463       dbin->select_streams_seqnum = seqnum;
2464       if (dbin->pending_select_streams != NULL) {
2465         GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2466         g_list_free (dbin->pending_select_streams);
2467         dbin->pending_select_streams = NULL;
2468       }
2469       gst_event_parse_select_streams (event, &streams);
2470       dbin->pending_select_streams = g_list_copy (streams);
2471       SELECTION_UNLOCK (dbin);
2472
2473       /* Send event upstream */
2474       if ((peer = gst_pad_get_peer (pad))) {
2475         gst_pad_send_event (peer, event);
2476         gst_object_unref (peer);
2477       } else {
2478         gst_event_unref (event);
2479       }
2480       /* Finally handle the switch */
2481       if (streams) {
2482         handle_stream_switch (dbin, streams, seqnum);
2483         g_list_free_full (streams, g_free);
2484       }
2485       ret = GST_PAD_PROBE_HANDLED;
2486     }
2487       break;
2488     default:
2489       break;
2490   }
2491
2492   return ret;
2493 }
2494
2495 static gboolean
2496 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
2497 {
2498   GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
2499   if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
2500     GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2501     GList *streams = NULL;
2502     guint32 seqnum = gst_event_get_seqnum (event);
2503
2504     SELECTION_LOCK (dbin);
2505     if (seqnum == dbin->select_streams_seqnum) {
2506       SELECTION_UNLOCK (dbin);
2507       GST_DEBUG_OBJECT (dbin,
2508           "Already handled/handling that SELECT_STREAMS event");
2509       return TRUE;
2510     }
2511     dbin->select_streams_seqnum = seqnum;
2512     if (dbin->pending_select_streams != NULL) {
2513       GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2514       g_list_free (dbin->pending_select_streams);
2515       dbin->pending_select_streams = NULL;
2516     }
2517     gst_event_parse_select_streams (event, &streams);
2518     dbin->pending_select_streams = g_list_copy (streams);
2519     SELECTION_UNLOCK (dbin);
2520
2521     /* FIXME : We don't have an upstream ?? */
2522 #if 0
2523     /* Send event upstream */
2524     if ((peer = gst_pad_get_peer (pad))) {
2525       gst_pad_send_event (peer, event);
2526       gst_object_unref (peer);
2527     }
2528 #endif
2529     /* Finally handle the switch */
2530     if (streams) {
2531       handle_stream_switch (dbin, streams, seqnum);
2532       g_list_free_full (streams, g_free);
2533     }
2534
2535     gst_event_unref (event);
2536     return TRUE;
2537   }
2538   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
2539 }
2540
2541
2542 static void
2543 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2544 {
2545   if (slot->probe_id)
2546     gst_pad_remove_probe (slot->src_pad, slot->probe_id);
2547   if (slot->input) {
2548     if (slot->input->srcpad)
2549       gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
2550   }
2551
2552   gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2553   gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
2554   gst_object_replace ((GstObject **) & slot->src_pad, NULL);
2555   gst_object_replace ((GstObject **) & slot->active_stream, NULL);
2556   g_free (slot);
2557 }
2558
2559 static void
2560 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2561 {
2562   GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
2563   gst_element_call_async (GST_ELEMENT_CAST (dbin),
2564       (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
2565 }
2566
2567 /* Create a DecodebinOutputStream for a given type
2568  * Note: It will be empty initially, it needs to be configured
2569  * afterwards */
2570 static DecodebinOutputStream *
2571 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
2572 {
2573   DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
2574   gchar *pad_name;
2575   const gchar *prefix;
2576   GstStaticPadTemplate *templ;
2577   GstPadTemplate *ptmpl;
2578   guint32 *counter;
2579   GstPad *internal_pad;
2580
2581   GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
2582       res, gst_stream_type_get_name (type));
2583
2584   res->type = type;
2585   res->dbin = dbin;
2586
2587   if (type & GST_STREAM_TYPE_VIDEO) {
2588     templ = &video_src_template;
2589     counter = &dbin->vpadcount;
2590     prefix = "video";
2591   } else if (type & GST_STREAM_TYPE_AUDIO) {
2592     templ = &audio_src_template;
2593     counter = &dbin->apadcount;
2594     prefix = "audio";
2595   } else if (type & GST_STREAM_TYPE_TEXT) {
2596     templ = &text_src_template;
2597     counter = &dbin->tpadcount;
2598     prefix = "text";
2599   } else {
2600     templ = &src_template;
2601     counter = &dbin->opadcount;
2602     prefix = "src";
2603   }
2604
2605   pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
2606   *counter += 1;
2607   ptmpl = gst_static_pad_template_get (templ);
2608   res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
2609   gst_object_unref (ptmpl);
2610   g_free (pad_name);
2611   gst_pad_set_active (res->src_pad, TRUE);
2612   /* Put an event probe on the internal proxy pad to detect upstream
2613    * events */
2614   internal_pad =
2615       (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
2616   gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
2617       (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
2618   gst_object_unref (internal_pad);
2619
2620   dbin->output_streams = g_list_append (dbin->output_streams, res);
2621
2622   return res;
2623 }
2624
2625 static void
2626 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
2627 {
2628   if (output->slot) {
2629     if (output->decoder_sink && output->decoder)
2630       gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
2631
2632     output->slot->output = NULL;
2633     output->slot = NULL;
2634   }
2635   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2636   gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
2637   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2638   if (output->src_exposed) {
2639     gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
2640   }
2641   if (output->decoder) {
2642     gst_element_set_locked_state (output->decoder, TRUE);
2643     gst_element_set_state (output->decoder, GST_STATE_NULL);
2644     gst_bin_remove ((GstBin *) dbin, output->decoder);
2645   }
2646   g_free (output);
2647 }
2648
2649 static GstStateChangeReturn
2650 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
2651 {
2652   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2653   GstStateChangeReturn ret;
2654
2655   /* Upwards */
2656   switch (transition) {
2657     default:
2658       break;
2659   }
2660   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2661   if (ret == GST_STATE_CHANGE_FAILURE)
2662     goto beach;
2663
2664   switch (transition) {
2665     case GST_STATE_CHANGE_PAUSED_TO_READY:
2666     {
2667       GList *tmp;
2668
2669       /* Free output streams */
2670       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2671         DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2672         free_output_stream (dbin, output);
2673       }
2674       g_list_free (dbin->output_streams);
2675       dbin->output_streams = NULL;
2676       /* Free multiqueue slots */
2677       for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2678         MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2679         free_multiqueue_slot (dbin, slot);
2680       }
2681       g_list_free (dbin->slots);
2682       dbin->slots = NULL;
2683       /* Free inputs */
2684     }
2685       break;
2686     default:
2687       break;
2688   }
2689 beach:
2690   return ret;
2691 }
2692
2693 gboolean
2694 gst_decodebin3_plugin_init (GstPlugin * plugin)
2695 {
2696   GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");
2697
2698   return gst_element_register (plugin, "decodebin3", GST_RANK_NONE,
2699       GST_TYPE_DECODEBIN3);
2700 }