4c48e52ec1025d4e984b01994a458659005c0757
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst / playback / gstdecodebin3.c
1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32
33 #include "gstplaybackelements.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36
37 /**
38  * SECTION:element-decodebin3
39  * @title: decodebin3
40  *
41  * #GstBin that auto-magically constructs a decoding pipeline using available
42  * decoders and demuxers via auto-plugging. The output is raw audio, video
43  * or subtitle streams.
44  *
45  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46  *
47  * * supports publication and selection of stream information via
48  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAMS events.
49  *
50  * * dynamically switches stream connections internally, and
51  * reuses decoder elements when stream selections change, so that in
52  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53  * and only creates new elements when streams change and an existing decoder
54  * is not capable of handling the new format.
55  *
56  * * supports multiple input pads for the parallel decoding of auxiliary streams
57  * not muxed with the primary stream.
58  *
59  * * does not handle network stream buffering. decodebin3 expects that network stream
60  * buffering is handled upstream, before data is passed to it.
61  *
62  */
63
64 /*
65  * Global design
66  *
67  * 1) From sink pad to elementary streams (GstParseBin or identity)
68  *
69  * Note : If the incoming streams are push-based-only and are compatible with
70  * either the output caps or a potential decoder, the usage of parsebin is
71  * replaced by a simple passthrough identity element.
72  *
73  * The input sink pads are fed to GstParseBin. GstParseBin will feed them
74  * through typefind. When the caps are detected (or changed) we recursively
75  * figure out which demuxer, parser or depayloader is needed until we get to
76  * elementary streams.
77  *
78  * All elementary streams (whether decoded or not, whether exposed or not) are
79  * fed through multiqueue. There is only *one* multiqueue in decodebin3.
80  *
81  * => MultiQueue is the cornerstone.
82  * => No buffering before multiqueue
83  *
84  * 2) Elementary streams
85  *
86  * After GstParseBin, there are 3 main components:
87  *  1) Input Streams (provided by GstParseBin)
88  *  2) Multiqueue slots
89  *  3) Output Streams
90  *
91  * Input Streams correspond to the stream coming from GstParseBin and that gets
92  * fed into a multiqueue slot.
93  *
94  * Output Streams correspond to the combination of a (optional) decoder and an
95  * output ghostpad. Output Streams can be moved from one multiqueue slot to
96  * another, can reconfigure itself (different decoders), and can be
97  * added/removed depending on the configuration (all streams outputted, only one
98  * of each type, ...).
99  *
100  * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
101  * each 'active' Input Stream there is a corresponding slot.
102  * Slots might have different streams on input and output (due to internal
103  * buffering).
104  *
105  * Due to internal queuing/buffering/..., all those components (might) behave
106  * asynchronously. Therefore probes will be used on each component source pad to
107  * detect various key-points:
108  *  * EOS :
109  *     the stream is done => Mark that component as done, optionally freeing/removing it
110  *  * STREAM_START :
111  *     a new stream is starting => link it further if needed
112  *
113  * 3) Gradual replacement
114  *
115  * If the caps change at any point in decodebin (input sink pad, demuxer output,
116  * multiqueue output, ..), we gradually replace (if needed) the following elements.
117  *
118  * This is handled by the probes in various locations:
119  *  a) typefind output
120  *  b) multiqueue input (source pad of Input Streams)
121  *  c) multiqueue output (source pad of Multiqueue Slots)
122  *  d) final output (target of source ghostpads)
123  *
124  * When CAPS event arrive at those points, one of three things can happen:
125  * a) There is no elements downstream yet, just create/link-to following elements
126  * b) There are downstream elements, do a ACCEPT_CAPS query
127  *  b.1) The new CAPS are accepted, keep current configuration
128  *  b.2) The new CAPS are not accepted, remove following elements then do a)
129  *
130  *    Components:
131  *
132  *                                                   MultiQ     Output
133  *                     Input(s)                      Slots      Streams
134  *  /-------------------------------------------\   /-----\  /------------- \
135  *
136  * +-------------------------------------------------------------------------+
137  * |                                                                         |
138  * | +---------------------------------------------+                         |
139  * | |   GstParseBin(s)                            |                         |
140  * | |                +--------------+             |  +-----+                |
141  * | |                |              |---[parser]-[|--| Mul |---[ decoder ]-[|
142  * |]--[ typefind ]---|  demuxer(s)  |------------[|  | ti  |                |
143  * | |                |  (if needed) |---[parser]-[|--| qu  |                |
144  * | |                |              |---[parser]-[|--| eu  |---[ decoder ]-[|
145  * | |                +--------------+             |  +------             ^  |
146  * | +---------------------------------------------+        ^             |  |
147  * |                                               ^        |             |  |
148  * +-----------------------------------------------+--------+-------------+--+
149  *                                                 |        |             |
150  *                                                 |        |             |
151  *                                       Probes  --/--------/-------------/
152  *
153  * ATOMIC SWITCHING
154  *
155  * We want to ensure we re-use decoders when switching streams. This takes place
156  * at the multiqueue output level.
157  *
158  * MAIN CONCEPTS
159  *  1) Activating a stream (i.e. linking a slot to an output) is only done within
160  *    the streaming thread in the multiqueue_src_probe() and only if the
161  *    stream is in the REQUESTED selection.
162  *  2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
163  *    within the stream thread, but only in a purposefully called IDLE probe
164  *    that calls reassign_slot().
165  *
166  * Based on those two principles, 3 "selection" of streams (stream-id) are used:
167  * 1) requested_selection
168  *    All streams within that list should be activated
169  * 2) active_selection
170  *    List of streams that are exposed by decodebin
171  * 3) to_activate
172  *    List of streams that will be moved to requested_selection in the
173  *    reassign_slot() method (i.e. once a stream was deactivated, and the output
174  *    was retargetted)
175  */
176
177
178 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
179 #define GST_CAT_DEFAULT decodebin3_debug
180
181 #define GST_TYPE_DECODEBIN3      (gst_decodebin3_get_type ())
182
183 #define EXTRA_DEBUG 1
184
185 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
186 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
187 static GQuark
188 _custom_final_eos_quark_get (void)
189 {
190   static gsize g_quark;
191
192   if (g_once_init_enter (&g_quark)) {
193     gsize quark =
194         (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
195     g_once_init_leave (&g_quark, quark);
196   }
197   return g_quark;
198 }
199
200 typedef struct _GstDecodebin3 GstDecodebin3;
201 typedef struct _GstDecodebin3Class GstDecodebin3Class;
202
203 typedef struct _DecodebinInputStream DecodebinInputStream;
204 typedef struct _DecodebinInput DecodebinInput;
205 typedef struct _DecodebinOutputStream DecodebinOutputStream;
206
207 struct _GstDecodebin3
208 {
209   GstBin bin;
210
211   /* input_lock protects the following variables */
212   GMutex input_lock;
213   /* Main input (static sink pad) */
214   DecodebinInput *main_input;
215   /* Supplementary input (request sink pads) */
216   GList *other_inputs;
217   /* counter for input */
218   guint32 input_counter;
219   /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
220   guint32 current_group_id;
221   /* End of variables protected by input_lock */
222
223   GstElement *multiqueue;
224   GstClockTime default_mq_min_interleave;
225   GstClockTime current_mq_min_interleave;
226
227   /* selection_lock protects access to following variables */
228   GMutex selection_lock;
229   GList *input_streams;         /* List of DecodebinInputStream for active collection */
230   GList *output_streams;        /* List of DecodebinOutputStream used for output */
231   GList *slots;                 /* List of MultiQueueSlot */
232   guint slot_id;
233
234   /* Active collection */
235   GstStreamCollection *collection;
236   /* requested selection of stream-id to activate post-multiqueue */
237   GList *requested_selection;
238   /* list of stream-id currently activated in output */
239   GList *active_selection;
240   /* List of stream-id that need to be activated (after a stream switch for ex) */
241   GList *to_activate;
242   /* Pending select streams event */
243   guint32 select_streams_seqnum;
244   /* pending list of streams to select (from downstream) */
245   GList *pending_select_streams;
246   /* TRUE if requested_selection was updated, will become FALSE once
247    * it has fully transitioned to active */
248   gboolean selection_updated;
249   /* End of variables protected by selection_lock */
250   gboolean upstream_selected;
251
252   /* Factories */
253   GMutex factories_lock;
254   guint32 factories_cookie;
255   /* All DECODABLE factories */
256   GList *factories;
257   /* Only DECODER factories */
258   GList *decoder_factories;
259   /* DECODABLE but not DECODER factories */
260   GList *decodable_factories;
261
262   /* counters for pads */
263   guint32 apadcount, vpadcount, tpadcount, opadcount;
264
265   /* Properties */
266   GstCaps *caps;
267 };
268
269 struct _GstDecodebin3Class
270 {
271   GstBinClass class;
272
273     gint (*select_stream) (GstDecodebin3 * dbin,
274       GstStreamCollection * collection, GstStream * stream);
275 };
276
277 /* Input of decodebin, controls input pad and parsebin */
278 struct _DecodebinInput
279 {
280   GstDecodebin3 *dbin;
281
282   gboolean is_main;
283
284   GstPad *ghost_sink;
285   GstPad *parsebin_sink;
286
287   GstStreamCollection *collection;      /* Active collection */
288   gboolean upstream_selected;
289
290   guint group_id;
291
292   /* Either parsebin or identity is used */
293   GstElement *parsebin;
294   GstElement *identity;
295
296   gulong pad_added_sigid;
297   gulong pad_removed_sigid;
298   gulong drained_sigid;
299
300   /* TRUE if the input got drained */
301   gboolean drained;
302
303   /* TEMPORARY HACK for knowing if upstream is already parsed and identity can
304    * be avoided */
305   gboolean input_is_parsed;
306 };
307
308 /* Multiqueue Slots */
309 typedef struct _MultiQueueSlot
310 {
311   guint id;
312
313   GstDecodebin3 *dbin;
314   /* Type of stream handled by this slot */
315   GstStreamType type;
316
317   /* Linked input and output */
318   DecodebinInputStream *input;
319
320   /* pending => last stream received on sink pad */
321   GstStream *pending_stream;
322   /* active => last stream outputted on source pad */
323   GstStream *active_stream;
324
325   GstPad *sink_pad, *src_pad;
326
327   /* id of the MQ src_pad event probe */
328   gulong probe_id;
329
330   /* TRUE if EOS was pushed out by multiqueue */
331   gboolean is_drained;
332
333   DecodebinOutputStream *output;
334 } MultiQueueSlot;
335
336 /* Streams that are exposed downstream (i.e. output) */
337 struct _DecodebinOutputStream
338 {
339   GstDecodebin3 *dbin;
340   /* The type of stream handled by this output stream */
341   GstStreamType type;
342
343   /* The slot to which this output stream is currently connected to */
344   MultiQueueSlot *slot;
345
346   GstElement *decoder;          /* Optional */
347   GstPad *decoder_sink, *decoder_src;
348   gboolean linked;
349
350   /* ghostpad */
351   GstPad *src_pad;
352   /* Flag if ghost pad is exposed */
353   gboolean src_exposed;
354
355   /* Reported decoder latency */
356   GstClockTime decoder_latency;
357
358   /* keyframe dropping probe */
359   gulong drop_probe_id;
360 };
361
362 /* properties */
363 enum
364 {
365   PROP_0,
366   PROP_CAPS
367 };
368
369 /* signals */
370 enum
371 {
372   SIGNAL_SELECT_STREAM,
373   SIGNAL_ABOUT_TO_FINISH,
374   LAST_SIGNAL
375 };
376 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
377
378 #define SELECTION_LOCK(dbin) G_STMT_START {                             \
379     GST_LOG_OBJECT (dbin,                                               \
380                     "selection locking from thread %p",                 \
381                     g_thread_self ());                                  \
382     g_mutex_lock (&dbin->selection_lock);                               \
383     GST_LOG_OBJECT (dbin,                                               \
384                     "selection locked from thread %p",                  \
385                     g_thread_self ());                                  \
386   } G_STMT_END
387
388 #define SELECTION_UNLOCK(dbin) G_STMT_START {                           \
389     GST_LOG_OBJECT (dbin,                                               \
390                     "selection unlocking from thread %p",               \
391                     g_thread_self ());                                  \
392     g_mutex_unlock (&dbin->selection_lock);                             \
393   } G_STMT_END
394
395 #define INPUT_LOCK(dbin) G_STMT_START {                         \
396     GST_LOG_OBJECT (dbin,                                               \
397                     "input locking from thread %p",                     \
398                     g_thread_self ());                                  \
399     g_mutex_lock (&dbin->input_lock);                           \
400     GST_LOG_OBJECT (dbin,                                               \
401                     "input locked from thread %p",                      \
402                     g_thread_self ());                                  \
403   } G_STMT_END
404
405 #define INPUT_UNLOCK(dbin) G_STMT_START {                               \
406     GST_LOG_OBJECT (dbin,                                               \
407                     "input unlocking from thread %p",           \
408                     g_thread_self ());                                  \
409     g_mutex_unlock (&dbin->input_lock);                         \
410   } G_STMT_END
411
412 GType gst_decodebin3_get_type (void);
413 #define gst_decodebin3_parent_class parent_class
414 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
415 #define _do_init \
416     GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");\
417     playback_element_init (plugin);
418 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (decodebin3, "decodebin3", GST_RANK_NONE,
419     GST_TYPE_DECODEBIN3, _do_init);
420
421 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
422
423 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
424     GST_PAD_SINK,
425     GST_PAD_ALWAYS,
426     GST_STATIC_CAPS_ANY);
427
428 static GstStaticPadTemplate request_sink_template =
429 GST_STATIC_PAD_TEMPLATE ("sink_%u",
430     GST_PAD_SINK,
431     GST_PAD_REQUEST,
432     GST_STATIC_CAPS_ANY);
433
434 static GstStaticPadTemplate video_src_template =
435 GST_STATIC_PAD_TEMPLATE ("video_%u",
436     GST_PAD_SRC,
437     GST_PAD_SOMETIMES,
438     GST_STATIC_CAPS_ANY);
439
440 static GstStaticPadTemplate audio_src_template =
441 GST_STATIC_PAD_TEMPLATE ("audio_%u",
442     GST_PAD_SRC,
443     GST_PAD_SOMETIMES,
444     GST_STATIC_CAPS_ANY);
445
446 static GstStaticPadTemplate text_src_template =
447 GST_STATIC_PAD_TEMPLATE ("text_%u",
448     GST_PAD_SRC,
449     GST_PAD_SOMETIMES,
450     GST_STATIC_CAPS_ANY);
451
452 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
453     GST_PAD_SRC,
454     GST_PAD_SOMETIMES,
455     GST_STATIC_CAPS_ANY);
456
457
458 static void gst_decodebin3_dispose (GObject * object);
459 static void gst_decodebin3_finalize (GObject * object);
460 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
461     const GValue * value, GParamSpec * pspec);
462 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
463     GValue * value, GParamSpec * pspec);
464
465 static gboolean parsebin_autoplug_continue_cb (GstElement *
466     parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
467
468 static gint
469 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
470     GstStreamCollection * collection, GstStream * stream)
471 {
472   GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
473
474   return -1;
475 }
476
477 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
478     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
479 static void gst_decodebin3_release_pad (GstElement * element, GstPad * pad);
480 static void handle_stream_collection (GstDecodebin3 * dbin,
481     GstStreamCollection * collection, DecodebinInput * input);
482 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
483 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
484     GstStateChange transition);
485 static gboolean gst_decodebin3_send_event (GstElement * element,
486     GstEvent * event);
487
488 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
489 #if 0
490 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
491     GstElementFactoryListType ftype);
492 #endif
493
494 static void reset_input (GstDecodebin3 * dbin, DecodebinInput * input);
495 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
496 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
497 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
498
499 static gboolean reconfigure_output_stream (DecodebinOutputStream * output,
500     MultiQueueSlot * slot);
501 static void free_output_stream (GstDecodebin3 * dbin,
502     DecodebinOutputStream * output);
503 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
504     GstStreamType type);
505
506 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
507     GstPadProbeInfo * info, MultiQueueSlot * slot);
508 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
509 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
510     DecodebinInputStream * input);
511 static void link_input_to_slot (DecodebinInputStream * input,
512     MultiQueueSlot * slot);
513 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
514 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
515     MultiQueueSlot * slot);
516
517 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
518 static void update_requested_selection (GstDecodebin3 * dbin);
519
520 /* FIXME: Really make all the parser stuff a self-contained helper object */
521 #include "gstdecodebin3-parse.c"
522
523 static gboolean
524 _gst_int_accumulator (GSignalInvocationHint * ihint,
525     GValue * return_accu, const GValue * handler_return, gpointer dummy)
526 {
527   gint res = g_value_get_int (handler_return);
528
529   g_value_set_int (return_accu, res);
530
531   if (res == -1)
532     return TRUE;
533
534   return FALSE;
535 }
536
537 static void
538 gst_decodebin3_class_init (GstDecodebin3Class * klass)
539 {
540   GObjectClass *gobject_klass = (GObjectClass *) klass;
541   GstElementClass *element_class = (GstElementClass *) klass;
542   GstBinClass *bin_klass = (GstBinClass *) klass;
543
544   gobject_klass->dispose = gst_decodebin3_dispose;
545   gobject_klass->finalize = gst_decodebin3_finalize;
546   gobject_klass->set_property = gst_decodebin3_set_property;
547   gobject_klass->get_property = gst_decodebin3_get_property;
548
549   g_object_class_install_property (gobject_klass, PROP_CAPS,
550       g_param_spec_boxed ("caps", "Caps",
551           "The caps on which to stop decoding. (NULL = default)",
552           GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
553
554   /**
555    * GstDecodebin3::select-stream
556    * @decodebin: a #GstDecodebin3
557    * @collection: a #GstStreamCollection
558    * @stream: a #GstStream
559    *
560    * This signal is emitted whenever @decodebin needs to decide whether
561    * to expose a @stream of a given @collection.
562    *
563    * Note that the prefered way to select streams is to listen to
564    * GST_MESSAGE_STREAM_COLLECTION on the bus and send a
565    * GST_EVENT_SELECT_STREAMS with the streams the user wants.
566    *
567    * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
568    * A value of -1 (default) lets @decodebin decide what to do with the stream.
569    * */
570   gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
571       g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
572       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
573       _gst_int_accumulator, NULL, NULL,
574       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
575
576   /**
577    * GstDecodebin3::about-to-finish:
578    *
579    * This signal is emitted when the data for the selected URI is
580    * entirely buffered and it is safe to specify another URI.
581    */
582   gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
583       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
584       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
585
586
587   element_class->request_new_pad =
588       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
589   element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
590   element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
591   element_class->release_pad = GST_DEBUG_FUNCPTR (gst_decodebin3_release_pad);
592
593   gst_element_class_add_pad_template (element_class,
594       gst_static_pad_template_get (&sink_template));
595   gst_element_class_add_pad_template (element_class,
596       gst_static_pad_template_get (&request_sink_template));
597   gst_element_class_add_pad_template (element_class,
598       gst_static_pad_template_get (&video_src_template));
599   gst_element_class_add_pad_template (element_class,
600       gst_static_pad_template_get (&audio_src_template));
601   gst_element_class_add_pad_template (element_class,
602       gst_static_pad_template_get (&text_src_template));
603   gst_element_class_add_pad_template (element_class,
604       gst_static_pad_template_get (&src_template));
605
606   gst_element_class_set_static_metadata (element_class,
607       "Decoder Bin 3", "Generic/Bin/Decoder",
608       "Autoplug and decode to raw media",
609       "Edward Hervey <edward@centricular.com>");
610
611   bin_klass->handle_message = gst_decodebin3_handle_message;
612
613   klass->select_stream = gst_decodebin3_select_stream;
614 }
615
616 static void
617 gst_decodebin3_init (GstDecodebin3 * dbin)
618 {
619   /* Create main input */
620   dbin->main_input = create_new_input (dbin, TRUE);
621
622   dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
623   g_object_get (dbin->multiqueue, "min-interleave-time",
624       &dbin->default_mq_min_interleave, NULL);
625   dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
626   g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
627       "max-size-buffers", 0, "use-interleave", TRUE, NULL);
628   gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
629
630   dbin->current_group_id = GST_GROUP_ID_INVALID;
631
632   g_mutex_init (&dbin->factories_lock);
633   g_mutex_init (&dbin->selection_lock);
634   g_mutex_init (&dbin->input_lock);
635
636   dbin->caps = gst_static_caps_get (&default_raw_caps);
637
638   GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
639 }
640
641 static void
642 gst_decodebin3_reset (GstDecodebin3 * dbin)
643 {
644   GList *tmp;
645
646   GST_DEBUG_OBJECT (dbin, "Resetting");
647
648   /* Free output streams */
649   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
650     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
651     free_output_stream (dbin, output);
652   }
653   g_list_free (dbin->output_streams);
654   dbin->output_streams = NULL;
655
656   /* Free multiqueue slots */
657   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
658     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
659     free_multiqueue_slot (dbin, slot);
660   }
661   g_list_free (dbin->slots);
662   dbin->slots = NULL;
663   dbin->current_group_id = GST_GROUP_ID_INVALID;
664
665   /* Reset the inputs */
666   reset_input (dbin, dbin->main_input);
667   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
668     reset_input (dbin, tmp->data);
669   }
670
671   /* Reset multiqueue to default interleave */
672   g_object_set (dbin->multiqueue, "min-interleave-time",
673       dbin->default_mq_min_interleave, NULL);
674   dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
675   dbin->upstream_selected = FALSE;
676
677   g_list_free_full (dbin->requested_selection, g_free);
678   dbin->requested_selection = NULL;
679
680   g_list_free_full (dbin->active_selection, g_free);
681   dbin->active_selection = NULL;
682
683   g_list_free (dbin->to_activate);
684   dbin->to_activate = NULL;
685
686   g_list_free (dbin->pending_select_streams);
687   dbin->pending_select_streams = NULL;
688
689   dbin->selection_updated = FALSE;
690 }
691
692 static void
693 gst_decodebin3_dispose (GObject * object)
694 {
695   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
696   GList *walk, *next;
697
698   gst_decodebin3_reset (dbin);
699
700   if (dbin->factories) {
701     gst_plugin_feature_list_free (dbin->factories);
702     dbin->factories = NULL;
703   }
704   if (dbin->decoder_factories) {
705     g_list_free (dbin->decoder_factories);
706     dbin->decoder_factories = NULL;
707   }
708   if (dbin->decodable_factories) {
709     g_list_free (dbin->decodable_factories);
710     dbin->decodable_factories = NULL;
711   }
712
713   gst_clear_object (&dbin->collection);
714
715   INPUT_LOCK (dbin);
716   if (dbin->main_input) {
717     free_input (dbin, dbin->main_input);
718     dbin->main_input = NULL;
719   }
720
721   for (walk = dbin->other_inputs; walk; walk = next) {
722     DecodebinInput *input = walk->data;
723
724     next = g_list_next (walk);
725
726     free_input (dbin, input);
727     dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
728   }
729   INPUT_UNLOCK (dbin);
730
731   G_OBJECT_CLASS (parent_class)->dispose (object);
732 }
733
734 static void
735 gst_decodebin3_finalize (GObject * object)
736 {
737   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
738
739   g_mutex_clear (&dbin->factories_lock);
740   g_mutex_clear (&dbin->selection_lock);
741   g_mutex_clear (&dbin->input_lock);
742
743   G_OBJECT_CLASS (parent_class)->finalize (object);
744 }
745
746 static void
747 gst_decodebin3_set_property (GObject * object, guint prop_id,
748     const GValue * value, GParamSpec * pspec)
749 {
750   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
751
752   switch (prop_id) {
753     case PROP_CAPS:
754       GST_OBJECT_LOCK (dbin);
755       if (dbin->caps)
756         gst_caps_unref (dbin->caps);
757       dbin->caps = g_value_dup_boxed (value);
758       GST_OBJECT_UNLOCK (dbin);
759       break;
760     default:
761       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
762       break;
763   }
764 }
765
766 static void
767 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
768     GParamSpec * pspec)
769 {
770   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
771
772   switch (prop_id) {
773     case PROP_CAPS:
774       GST_OBJECT_LOCK (dbin);
775       g_value_set_boxed (value, dbin->caps);
776       GST_OBJECT_UNLOCK (dbin);
777       break;
778     default:
779       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
780       break;
781   }
782 }
783
784 static gboolean
785 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
786     GstCaps * caps, GstDecodebin3 * dbin)
787 {
788   GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
789
790   /* If it matches our target caps, expose it */
791   if (gst_caps_can_intersect (caps, dbin->caps))
792     return FALSE;
793
794   return TRUE;
795 }
796
797 /* This method should be called whenever a STREAM_START event
798  * comes out of a given parsebin.
799  * The caller shall replace the group_id if the function returns TRUE */
800 static gboolean
801 set_input_group_id (DecodebinInput * input, guint32 * group_id)
802 {
803   GstDecodebin3 *dbin = input->dbin;
804
805   if (input->group_id != *group_id) {
806     if (input->group_id != GST_GROUP_ID_INVALID)
807       GST_WARNING_OBJECT (dbin,
808           "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
809           ") on input %p ", input->group_id, *group_id, input);
810     input->group_id = *group_id;
811   }
812
813   if (*group_id != dbin->current_group_id) {
814     /* The input is being re-used with a different incoming stream, we do want
815      * to change/unify to this new group-id */
816     if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
817       GST_DEBUG_OBJECT (dbin,
818           "Setting current group id to %" G_GUINT32_FORMAT, *group_id);
819       dbin->current_group_id = *group_id;
820     } else {
821       GST_DEBUG_OBJECT (dbin, "Returning global group id %" G_GUINT32_FORMAT,
822           dbin->current_group_id);
823     }
824     *group_id = dbin->current_group_id;
825     return TRUE;
826   }
827
828   return FALSE;
829 }
830
831 static void
832 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
833 {
834   GstDecodebin3 *dbin = input->dbin;
835   gboolean all_drained;
836   GList *tmp;
837
838   GST_INFO_OBJECT (dbin, "input %p drained", input);
839   input->drained = TRUE;
840
841   all_drained = dbin->main_input->drained;
842   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
843     DecodebinInput *data = (DecodebinInput *) tmp->data;
844
845     all_drained &= data->drained;
846   }
847
848   if (all_drained) {
849     GST_INFO_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
850     g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
851         NULL);
852   }
853 }
854
855 /* Call with INPUT_LOCK taken */
856 static gboolean
857 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
858 {
859   gboolean set_state = FALSE;
860
861   if (input->parsebin == NULL) {
862     input->parsebin = gst_element_factory_make ("parsebin", NULL);
863     if (input->parsebin == NULL)
864       goto no_parsebin;
865     input->parsebin = gst_object_ref (input->parsebin);
866     input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
867     input->pad_added_sigid =
868         g_signal_connect (input->parsebin, "pad-added",
869         (GCallback) parsebin_pad_added_cb, input);
870     input->pad_removed_sigid =
871         g_signal_connect (input->parsebin, "pad-removed",
872         (GCallback) parsebin_pad_removed_cb, input);
873     input->drained_sigid =
874         g_signal_connect (input->parsebin, "drained",
875         (GCallback) parsebin_drained_cb, input);
876     g_signal_connect (input->parsebin, "autoplug-continue",
877         (GCallback) parsebin_autoplug_continue_cb, dbin);
878   }
879
880   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
881     /* The state lock is taken so that we ensure we are the one (de)activating
882      * parsebin. We need to do this to ensure any activation taking place in
883      * parsebin (including by elements doing upstream activation) are done
884      * within the same thread. */
885     GST_STATE_LOCK (input->parsebin);
886     gst_bin_add (GST_BIN (dbin), input->parsebin);
887     set_state = TRUE;
888   }
889
890   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
891       input->parsebin_sink);
892
893   if (set_state) {
894     gst_element_sync_state_with_parent (input->parsebin);
895     GST_STATE_UNLOCK (input->parsebin);
896   }
897
898   return TRUE;
899
900   /* ERRORS */
901 no_parsebin:
902   {
903     gst_element_post_message ((GstElement *) dbin,
904         gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
905     return FALSE;
906   }
907 }
908
909 static GstPadLinkReturn
910 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
911 {
912   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
913   GstQuery *query;
914   gboolean pull_mode = FALSE;
915   GstPadLinkReturn res = GST_PAD_LINK_OK;
916   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
917
918   g_return_val_if_fail (input, GST_PAD_LINK_REFUSED);
919
920   GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT, pad);
921
922   query = gst_query_new_scheduling ();
923   if (gst_pad_query (peer, query)
924       && gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL,
925           GST_SCHEDULING_FLAG_SEEKABLE))
926     pull_mode = TRUE;
927   gst_query_unref (query);
928
929   GST_DEBUG_OBJECT (dbin, "Upstream can do pull-based : %d", pull_mode);
930
931   /* If upstream *can* do pull-based, we always use a parsebin. If not, we will
932    * delay that decision to a later stage (caps/stream/collection event
933    * processing) to figure out if one is really needed or whether an identity
934    * element will be enough */
935   INPUT_LOCK (dbin);
936   if (pull_mode) {
937     if (!ensure_input_parsebin (dbin, input))
938       res = GST_PAD_LINK_REFUSED;
939     else if (input->identity) {
940       GST_ERROR_OBJECT (dbin,
941           "Can't reconfigure input from push-based to pull-based");
942       res = GST_PAD_LINK_REFUSED;
943     }
944   }
945
946   /* Clear stream-collection corresponding to current INPUT.  We do not
947    * recalculate the global one yet, it will be done when at least one
948    * collection is received/computed for this input.
949    */
950   if (input->collection) {
951     GST_DEBUG_OBJECT (pad, "Clearing input collection");
952     gst_object_unref (input->collection);
953     input->collection = NULL;
954   }
955
956   INPUT_UNLOCK (dbin);
957
958   return res;
959 }
960
961 /* Drop duration query during _input_pad_unlink */
962 static GstPadProbeReturn
963 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
964     DecodebinInput * input)
965 {
966   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
967
968   if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
969     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
970     if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
971       GST_LOG_OBJECT (pad, "stop forwarding query duration");
972       ret = GST_PAD_PROBE_HANDLED;
973     }
974   }
975
976   return ret;
977 }
978
979 /* CALL with INPUT LOCK */
980 static void
981 recalculate_group_id (GstDecodebin3 * dbin)
982 {
983   guint32 common_group_id;
984   GList *iter;
985
986   GST_DEBUG_OBJECT (dbin,
987       "recalculating, current global group_id: %" G_GUINT32_FORMAT,
988       dbin->current_group_id);
989
990   common_group_id = dbin->main_input->group_id;
991
992   for (iter = dbin->other_inputs; iter; iter = iter->next) {
993     DecodebinInput *input = iter->data;
994
995     if (input->group_id != common_group_id) {
996       if (common_group_id != GST_GROUP_ID_INVALID)
997         return;
998
999       common_group_id = input->group_id;
1000     }
1001   }
1002
1003   if (common_group_id == dbin->current_group_id) {
1004     GST_DEBUG_OBJECT (dbin, "Global group_id hasn't changed");
1005   } else {
1006     GST_DEBUG_OBJECT (dbin, "Updating global group_id to %" G_GUINT32_FORMAT,
1007         common_group_id);
1008     dbin->current_group_id = common_group_id;
1009   }
1010 }
1011
1012 /* CALL with INPUT LOCK */
1013 static void
1014 reset_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
1015 {
1016   GList *iter;
1017
1018   if (input->parsebin == NULL)
1019     return;
1020
1021   GST_DEBUG_OBJECT (dbin, "Resetting %" GST_PTR_FORMAT, input->parsebin);
1022
1023   INPUT_LOCK (dbin);
1024   GST_STATE_LOCK (dbin);
1025   gst_element_set_state (input->parsebin, GST_STATE_NULL);
1026   input->drained = FALSE;
1027   input->group_id = GST_GROUP_ID_INVALID;
1028   recalculate_group_id (dbin);
1029   for (iter = dbin->input_streams; iter; iter = iter->next) {
1030     DecodebinInputStream *istream = iter->data;
1031     if (istream->input == input)
1032       istream->saw_eos = TRUE;
1033   }
1034   gst_element_sync_state_with_parent (input->parsebin);
1035   GST_STATE_UNLOCK (dbin);
1036   INPUT_UNLOCK (dbin);
1037 }
1038
1039
1040 static void
1041 gst_decodebin3_input_pad_unlink (GstPad * pad, GstPad * peer,
1042     DecodebinInput * input)
1043 {
1044   GstDecodebin3 *dbin = input->dbin;
1045
1046   g_return_if_fail (input);
1047
1048   GST_LOG_OBJECT (dbin, "Got unlink on input pad %" GST_PTR_FORMAT, pad);
1049
1050   INPUT_LOCK (dbin);
1051
1052   if (input->parsebin && GST_PAD_MODE (pad) == GST_PAD_MODE_PULL) {
1053     GST_DEBUG_OBJECT (dbin, "Resetting parsebin since it's pull-based");
1054     reset_input_parsebin (dbin, input);
1055   }
1056   /* In all cases we will be receiving new stream-start and data */
1057   input->group_id = GST_GROUP_ID_INVALID;
1058   input->drained = FALSE;
1059   recalculate_group_id (dbin);
1060
1061   INPUT_UNLOCK (dbin);
1062 }
1063
1064 static void
1065 gst_decodebin3_release_pad (GstElement * element, GstPad * pad)
1066 {
1067   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1068   DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
1069   GstStreamCollection *collection = NULL;
1070   gulong probe_id = 0;
1071   GstMessage *msg;
1072
1073   g_return_if_fail (input);
1074   GST_LOG_OBJECT (dbin, "Releasing pad %" GST_PTR_FORMAT, pad);
1075
1076   INPUT_LOCK (dbin);
1077
1078   /* Clear stream-collection corresponding to current INPUT and post new
1079    * stream-collection message, if needed */
1080   if (input->collection) {
1081     gst_object_unref (input->collection);
1082     input->collection = NULL;
1083   }
1084
1085   SELECTION_LOCK (dbin);
1086   collection = get_merged_collection (dbin);
1087   if (!collection) {
1088     SELECTION_UNLOCK (dbin);
1089     goto beach;
1090   }
1091   if (collection == dbin->collection) {
1092     SELECTION_UNLOCK (dbin);
1093     gst_object_unref (collection);
1094     goto beach;
1095   }
1096
1097   GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
1098
1099   if (dbin->collection)
1100     gst_object_unref (dbin->collection);
1101   dbin->collection = collection;
1102   dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1103
1104   msg =
1105       gst_message_new_stream_collection ((GstObject *) dbin, dbin->collection);
1106
1107   if (input->parsebin)
1108     /* Drop duration queries that the application might be doing while this message is posted */
1109     probe_id = gst_pad_add_probe (input->parsebin_sink,
1110         GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
1111         (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
1112
1113   SELECTION_UNLOCK (dbin);
1114   gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
1115   update_requested_selection (dbin);
1116
1117   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
1118   if (input->parsebin) {
1119     gst_bin_remove (GST_BIN (dbin), input->parsebin);
1120     gst_element_set_state (input->parsebin, GST_STATE_NULL);
1121     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
1122     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
1123     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
1124     gst_pad_remove_probe (input->parsebin_sink, probe_id);
1125     gst_object_unref (input->parsebin);
1126     gst_object_unref (input->parsebin_sink);
1127
1128     input->parsebin = NULL;
1129     input->parsebin_sink = NULL;
1130   }
1131   if (input->identity) {
1132     GstPad *idpad = gst_element_get_static_pad (input->identity, "src");
1133     DecodebinInputStream *stream = find_input_stream_for_pad (dbin, idpad);
1134     gst_object_unref (idpad);
1135     remove_input_stream (dbin, stream);
1136     gst_element_set_state (input->identity, GST_STATE_NULL);
1137     gst_bin_remove (GST_BIN (dbin), input->identity);
1138     gst_object_unref (input->identity);
1139     input->identity = NULL;
1140   }
1141
1142   if (!input->is_main) {
1143     dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
1144     free_input (dbin, input);
1145   }
1146
1147 beach:
1148   INPUT_UNLOCK (dbin);
1149   return;
1150 }
1151
1152 /* Call with INPUT LOCK */
1153 static void
1154 reset_input (GstDecodebin3 * dbin, DecodebinInput * input)
1155 {
1156   GST_LOG_OBJECT (dbin, "Resetting input %p", input);
1157
1158   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
1159
1160   if (input->parsebin) {
1161     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
1162     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
1163     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
1164     gst_element_set_state (input->parsebin, GST_STATE_NULL);
1165     gst_clear_object (&input->parsebin);
1166     gst_clear_object (&input->parsebin_sink);
1167   }
1168   if (input->identity) {
1169     GstPad *idpad = gst_element_get_static_pad (input->identity, "src");
1170     DecodebinInputStream *stream = find_input_stream_for_pad (dbin, idpad);
1171     gst_object_unref (idpad);
1172     remove_input_stream (dbin, stream);
1173     gst_element_set_state (input->identity, GST_STATE_NULL);
1174     gst_clear_object (&input->identity);
1175   }
1176   if (input->collection)
1177     gst_clear_object (&input->collection);
1178
1179   input->group_id = GST_GROUP_ID_INVALID;
1180 }
1181
1182 /* Call with INPUT LOCK */
1183 static void
1184 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
1185 {
1186   reset_input (dbin, input);
1187
1188   GST_LOG_OBJECT (dbin, "Freeing input %p", input);
1189
1190   INPUT_UNLOCK (dbin);
1191   gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
1192   INPUT_LOCK (dbin);
1193
1194   g_free (input);
1195 }
1196
1197 static gboolean
1198 sink_query_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstQuery * query)
1199 {
1200   DecodebinInput *input =
1201       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1202
1203   g_return_val_if_fail (input, FALSE);
1204
1205   GST_DEBUG_OBJECT (sinkpad, "query %" GST_PTR_FORMAT, query);
1206
1207   /* We accept any caps, since we will reconfigure ourself internally if the new
1208    * stream is incompatible */
1209   if (GST_QUERY_TYPE (query) == GST_QUERY_ACCEPT_CAPS) {
1210     GST_DEBUG_OBJECT (dbin, "Accepting ACCEPT_CAPS query");
1211     gst_query_set_accept_caps_result (query, TRUE);
1212     return TRUE;
1213   }
1214   return gst_pad_query_default (sinkpad, GST_OBJECT (dbin), query);
1215 }
1216
1217 static gboolean
1218 is_parsebin_required_for_input (GstDecodebin3 * dbin, DecodebinInput * input,
1219     GstCaps * newcaps, GstPad * sinkpad)
1220 {
1221   gboolean parsebin_needed = TRUE;
1222   GstStream *stream;
1223
1224   stream = gst_pad_get_stream (sinkpad);
1225
1226   if (stream == NULL) {
1227     /* If upstream didn't provide a `GstStream` we will need to create a
1228      * parsebin to handle that stream */
1229     GST_DEBUG_OBJECT (sinkpad,
1230         "Need to create parsebin since upstream doesn't provide GstStream");
1231   } else if (gst_caps_can_intersect (newcaps, dbin->caps)) {
1232     /* If the incoming caps match decodebin3 output, no processing is needed */
1233     GST_FIXME_OBJECT (sinkpad, "parsebin not needed (matches output caps) !");
1234     parsebin_needed = FALSE;
1235   } else if (input->input_is_parsed) {
1236     GST_DEBUG_OBJECT (sinkpad, "input is parsed, no parsebin needed");
1237     parsebin_needed = FALSE;
1238   } else {
1239     GList *decoder_list;
1240     /* If the incoming caps are compatible with a decoder, we don't need to
1241      * process it before */
1242     g_mutex_lock (&dbin->factories_lock);
1243     gst_decode_bin_update_factories_list (dbin);
1244     decoder_list =
1245         gst_element_factory_list_filter (dbin->decoder_factories, newcaps,
1246         GST_PAD_SINK, TRUE);
1247     g_mutex_unlock (&dbin->factories_lock);
1248     if (decoder_list) {
1249       GST_FIXME_OBJECT (sinkpad, "parsebin not needed (available decoders) !");
1250       gst_plugin_feature_list_free (decoder_list);
1251       parsebin_needed = FALSE;
1252     }
1253   }
1254   if (stream)
1255     gst_object_unref (stream);
1256
1257   return parsebin_needed;
1258 }
1259
1260 static void
1261 setup_identify_for_input (GstDecodebin3 * dbin, DecodebinInput * input,
1262     GstPad * sinkpad)
1263 {
1264   GstPad *idsrc, *idsink;
1265   DecodebinInputStream *inputstream;
1266
1267   GST_DEBUG_OBJECT (sinkpad, "Adding identity for new input stream");
1268
1269   input->identity = gst_element_factory_make ("identity", NULL);
1270   /* We drop allocation queries due to our usage of multiqueue just
1271    * afterwards. It is just too dangerous.
1272    *
1273    * If application users want to have optimal raw source <=> sink allocations
1274    * they should not use decodebin3
1275    */
1276   g_object_set (input->identity, "drop-allocation", TRUE, NULL);
1277   input->identity = gst_object_ref (input->identity);
1278   idsink = gst_element_get_static_pad (input->identity, "sink");
1279   idsrc = gst_element_get_static_pad (input->identity, "src");
1280   gst_bin_add (GST_BIN (dbin), input->identity);
1281
1282   SELECTION_LOCK (dbin);
1283   inputstream = create_input_stream (dbin, idsrc, input);
1284   /* Forward any existing GstStream directly on the input stream */
1285   inputstream->active_stream = gst_pad_get_stream (sinkpad);
1286   SELECTION_UNLOCK (dbin);
1287
1288   gst_object_unref (idsrc);
1289   gst_object_unref (idsink);
1290   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), idsink);
1291   gst_element_sync_state_with_parent (input->identity);
1292 }
1293
1294 static gboolean
1295 sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
1296 {
1297   DecodebinInput *input =
1298       g_object_get_data (G_OBJECT (sinkpad), "decodebin.input");
1299
1300   g_return_val_if_fail (input, FALSE);
1301
1302   GST_DEBUG_OBJECT (sinkpad, "event %" GST_PTR_FORMAT, event);
1303
1304   switch (GST_EVENT_TYPE (event)) {
1305     case GST_EVENT_STREAM_START:
1306     {
1307       GstQuery *q = gst_query_new_selectable ();
1308       const GstStructure *s = gst_event_get_structure (event);
1309
1310       /* Query whether upstream can handle stream selection or not */
1311       if (gst_pad_peer_query (sinkpad, q)) {
1312         gst_query_parse_selectable (q, &input->upstream_selected);
1313         GST_DEBUG_OBJECT (sinkpad, "Upstream is selectable : %d",
1314             input->upstream_selected);
1315       } else {
1316         input->upstream_selected = FALSE;
1317         GST_DEBUG_OBJECT (sinkpad, "Upstream does not handle SELECTABLE query");
1318       }
1319       gst_query_unref (q);
1320
1321       /* FIXME : We force `decodebin3` to upstream selection mode if *any* of the
1322          inputs is. This means things might break if there's a mix */
1323       if (input->upstream_selected)
1324         dbin->upstream_selected = TRUE;
1325
1326       input->input_is_parsed = s
1327           && gst_structure_has_field (s, "urisourcebin-parsed-data");
1328
1329       /* Make sure group ids will be recalculated */
1330       input->group_id = GST_GROUP_ID_INVALID;
1331       INPUT_LOCK (dbin);
1332       recalculate_group_id (dbin);
1333       INPUT_UNLOCK (dbin);
1334       break;
1335     }
1336     case GST_EVENT_STREAM_COLLECTION:
1337     {
1338       GstStreamCollection *collection = NULL;
1339
1340       gst_event_parse_stream_collection (event, &collection);
1341       if (collection) {
1342         INPUT_LOCK (dbin);
1343         handle_stream_collection (dbin, collection, input);
1344         gst_object_unref (collection);
1345         INPUT_UNLOCK (dbin);
1346         SELECTION_LOCK (dbin);
1347         /* Post the (potentially) updated collection */
1348         if (dbin->collection) {
1349           GstMessage *msg;
1350           msg =
1351               gst_message_new_stream_collection ((GstObject *) dbin,
1352               dbin->collection);
1353           SELECTION_UNLOCK (dbin);
1354           gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
1355           update_requested_selection (dbin);
1356         } else
1357           SELECTION_UNLOCK (dbin);
1358       }
1359
1360       /* If we are waiting to create an identity passthrough, do it now */
1361       if (!input->parsebin && !input->identity)
1362         setup_identify_for_input (dbin, input, sinkpad);
1363       break;
1364     }
1365     case GST_EVENT_CAPS:
1366     {
1367       GstCaps *newcaps = NULL;
1368
1369       gst_event_parse_caps (event, &newcaps);
1370       if (!newcaps)
1371         break;
1372       GST_DEBUG_OBJECT (sinkpad, "new caps %" GST_PTR_FORMAT, newcaps);
1373
1374       /* No parsebin or identity present, check if we can avoid creating one */
1375       if (!input->parsebin && !input->identity) {
1376         if (is_parsebin_required_for_input (dbin, input, newcaps, sinkpad)) {
1377           GST_DEBUG_OBJECT (sinkpad, "parsebin is required for input");
1378           ensure_input_parsebin (dbin, input);
1379           break;
1380         }
1381         GST_DEBUG_OBJECT (sinkpad,
1382             "parsebin not required. Will create identity passthrough element once we get the collection");
1383         break;
1384       }
1385
1386       if (input->identity) {
1387         if (is_parsebin_required_for_input (dbin, input, newcaps, sinkpad)) {
1388           GST_ERROR_OBJECT (sinkpad,
1389               "Switching from passthrough to parsebin on inputs is not supported !");
1390           gst_event_unref (event);
1391           return FALSE;
1392         }
1393         /* Nothing else to do here */
1394         break;
1395       }
1396
1397       /* Check if the parsebin present can handle the new caps */
1398       g_assert (input->parsebin);
1399       GST_DEBUG_OBJECT (sinkpad,
1400           "New caps, checking if they are compatible with existing parsebin");
1401       if (!gst_pad_query_accept_caps (input->parsebin_sink, newcaps)) {
1402         GST_DEBUG_OBJECT (sinkpad,
1403             "Parsebin doesn't accept the new caps %" GST_PTR_FORMAT, newcaps);
1404         /* Reset parsebin so that it reconfigures itself for the new stream format */
1405         INPUT_LOCK (dbin);
1406         reset_input_parsebin (dbin, input);
1407         INPUT_UNLOCK (dbin);
1408       } else {
1409         GST_DEBUG_OBJECT (sinkpad, "Parsebin accepts new caps");
1410       }
1411       break;
1412     }
1413     default:
1414       break;
1415   }
1416
1417   /* Chain to parent function */
1418   return gst_pad_event_default (sinkpad, GST_OBJECT (dbin), event);
1419 }
1420
1421 /* Call with INPUT_LOCK taken */
1422 static DecodebinInput *
1423 create_new_input (GstDecodebin3 * dbin, gboolean main)
1424 {
1425   DecodebinInput *input;
1426
1427   input = g_new0 (DecodebinInput, 1);
1428   input->dbin = dbin;
1429   input->is_main = main;
1430   input->group_id = GST_GROUP_ID_INVALID;
1431   if (main)
1432     input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
1433   else {
1434     gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
1435     input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
1436     g_free (pad_name);
1437   }
1438   input->upstream_selected = FALSE;
1439   g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
1440   gst_pad_set_event_function (input->ghost_sink,
1441       (GstPadEventFunction) sink_event_function);
1442   gst_pad_set_query_function (input->ghost_sink,
1443       (GstPadQueryFunction) sink_query_function);
1444   gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
1445   g_signal_connect (input->ghost_sink, "unlinked",
1446       (GCallback) gst_decodebin3_input_pad_unlink, input);
1447
1448   gst_pad_set_active (input->ghost_sink, TRUE);
1449   gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
1450
1451   return input;
1452
1453 }
1454
1455 static GstPad *
1456 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
1457     const gchar * name, const GstCaps * caps)
1458 {
1459   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1460   DecodebinInput *input;
1461   GstPad *res = NULL;
1462
1463   /* We are ignoring names for the time being, not sure it makes any sense
1464    * within the context of decodebin3 ... */
1465   input = create_new_input (dbin, FALSE);
1466   if (input) {
1467     INPUT_LOCK (dbin);
1468     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1469     res = input->ghost_sink;
1470     INPUT_UNLOCK (dbin);
1471   }
1472
1473   return res;
1474 }
1475
1476 /* Must be called with factories lock! */
1477 static void
1478 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1479 {
1480   guint cookie;
1481
1482   cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1483   if (!dbin->factories || dbin->factories_cookie != cookie) {
1484     GList *tmp;
1485     if (dbin->factories)
1486       gst_plugin_feature_list_free (dbin->factories);
1487     if (dbin->decoder_factories)
1488       g_list_free (dbin->decoder_factories);
1489     if (dbin->decodable_factories)
1490       g_list_free (dbin->decodable_factories);
1491     dbin->factories =
1492         gst_element_factory_list_get_elements
1493         (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1494     dbin->factories =
1495         g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1496     dbin->factories_cookie = cookie;
1497
1498     /* Filter decoder and other decodables */
1499     dbin->decoder_factories = NULL;
1500     dbin->decodable_factories = NULL;
1501     for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1502       GstElementFactory *fact = (GstElementFactory *) tmp->data;
1503       if (gst_element_factory_list_is_type (fact,
1504               GST_ELEMENT_FACTORY_TYPE_DECODER))
1505         dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1506       else
1507         dbin->decodable_factories =
1508             g_list_append (dbin->decodable_factories, fact);
1509     }
1510   }
1511 }
1512
1513 /* Must be called with appropriate lock if list is a protected variable */
1514 static const gchar *
1515 stream_in_list (GList * list, const gchar * sid)
1516 {
1517   GList *tmp;
1518
1519 #if EXTRA_DEBUG
1520   for (tmp = list; tmp; tmp = tmp->next) {
1521     gchar *osid = (gchar *) tmp->data;
1522     GST_DEBUG ("Checking %s against %s", sid, osid);
1523   }
1524 #endif
1525
1526   for (tmp = list; tmp; tmp = tmp->next) {
1527     const gchar *osid = (gchar *) tmp->data;
1528     if (!g_strcmp0 (sid, osid))
1529       return osid;
1530   }
1531
1532   return NULL;
1533 }
1534
1535 static GList *
1536 remove_from_list (GList * list, const gchar * sid)
1537 {
1538   GList *tmp;
1539
1540   for (tmp = list; tmp; tmp = tmp->next) {
1541     gchar *osid = tmp->data;
1542     if (!g_strcmp0 (sid, osid)) {
1543       g_free (osid);
1544       return g_list_delete_link (list, tmp);
1545     }
1546   }
1547   return list;
1548 }
1549
1550 static gboolean
1551 stream_list_equal (GList * lista, GList * listb)
1552 {
1553   GList *tmp;
1554
1555   if (g_list_length (lista) != g_list_length (listb))
1556     return FALSE;
1557
1558   for (tmp = lista; tmp; tmp = tmp->next) {
1559     gchar *osid = tmp->data;
1560     if (!stream_in_list (listb, osid))
1561       return FALSE;
1562   }
1563
1564   return TRUE;
1565 }
1566
1567 static void
1568 update_requested_selection (GstDecodebin3 * dbin)
1569 {
1570   guint i, nb;
1571   GList *tmp = NULL;
1572   gboolean all_user_selected = TRUE;
1573   GstStreamType used_types = 0;
1574   GstStreamCollection *collection;
1575
1576   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1577    *  the switch handler will take care of the pending selection */
1578   SELECTION_LOCK (dbin);
1579   if (dbin->pending_select_streams) {
1580     GST_DEBUG_OBJECT (dbin,
1581         "No need to create pending selection, SELECT_STREAMS underway");
1582     goto beach;
1583   }
1584
1585   collection = dbin->collection;
1586   if (G_UNLIKELY (collection == NULL)) {
1587     GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1588     goto beach;
1589   }
1590   nb = gst_stream_collection_get_size (collection);
1591
1592   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1593   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1594
1595   /* 3. If not, check if we already have some of the streams in the
1596    * existing active/requested selection */
1597   for (i = 0; i < nb; i++) {
1598     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1599     const gchar *sid = gst_stream_get_stream_id (stream);
1600     gint request = -1;
1601     /* Fire select-stream signal to see if outside components want to
1602      * hint at which streams should be selected */
1603     g_signal_emit (G_OBJECT (dbin),
1604         gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1605         &request);
1606     GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1607
1608     if (request == -1)
1609       all_user_selected = FALSE;
1610     if (request == 1 || (request == -1
1611             && (stream_in_list (dbin->requested_selection, sid)
1612                 || stream_in_list (dbin->active_selection, sid)))) {
1613       GstStreamType curtype = gst_stream_get_stream_type (stream);
1614       if (request == 1)
1615         GST_DEBUG_OBJECT (dbin,
1616             "Using stream requested by 'select-stream' signal : %s", sid);
1617       else
1618         GST_DEBUG_OBJECT (dbin,
1619             "Re-using stream already present in requested or active selection : %s",
1620             sid);
1621       tmp = g_list_append (tmp, (gchar *) sid);
1622       used_types |= curtype;
1623     }
1624   }
1625
1626   /* 4. If the user didn't explicitly selected all streams, match one stream of each type */
1627   if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) {
1628     for (i = 0; i < nb; i++) {
1629       GstStream *stream = gst_stream_collection_get_stream (collection, i);
1630       GstStreamType curtype = gst_stream_get_stream_type (stream);
1631       if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) {
1632         const gchar *sid = gst_stream_get_stream_id (stream);
1633         GST_DEBUG_OBJECT (dbin,
1634             "Automatically selecting stream '%s' of type %s", sid,
1635             gst_stream_type_get_name (curtype));
1636         tmp = g_list_append (tmp, (gchar *) sid);
1637         used_types |= curtype;
1638       }
1639     }
1640   }
1641
1642 beach:
1643   if (stream_list_equal (tmp, dbin->requested_selection)) {
1644     /* If the selection is equal, there is nothign to do */
1645     GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection");
1646     g_list_free (tmp);
1647     tmp = NULL;
1648   }
1649
1650   if (tmp) {
1651     /* Finally set the requested selection */
1652     if (dbin->requested_selection) {
1653       GST_FIXME_OBJECT (dbin,
1654           "Replacing non-NULL requested_selection, what should we do ??");
1655       g_list_free_full (dbin->requested_selection, g_free);
1656     }
1657     dbin->requested_selection =
1658         g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1659     dbin->selection_updated = TRUE;
1660     g_list_free (tmp);
1661   }
1662   SELECTION_UNLOCK (dbin);
1663 }
1664
1665 /* sort_streams:
1666  * GCompareFunc to use with lists of GstStream.
1667  * Sorts GstStreams by stream type and SELECT flag and stream-id
1668  * First video, then audio, then others.
1669  *
1670  * Return: negative if a<b, 0 if a==b, positive if a>b
1671  */
1672 static gint
1673 sort_streams (GstStream * sa, GstStream * sb)
1674 {
1675   GstStreamType typea, typeb;
1676   GstStreamFlags flaga, flagb;
1677   const gchar *ida, *idb;
1678   gint ret = 0;
1679
1680   typea = gst_stream_get_stream_type (sa);
1681   typeb = gst_stream_get_stream_type (sb);
1682
1683   GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1684       gst_stream_get_stream_id (sb));
1685
1686   /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1687   if (typea != typeb) {
1688     if (typea & GST_STREAM_TYPE_VIDEO)
1689       ret = -1;
1690     else if (typea & GST_STREAM_TYPE_AUDIO)
1691       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1692     else if (typea & GST_STREAM_TYPE_TEXT)
1693       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1694           && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1695     else if (typea & GST_STREAM_TYPE_CONTAINER)
1696       ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1697     else
1698       ret = 1;
1699
1700     if (ret != 0) {
1701       GST_LOG ("Sort by stream-type: %d", ret);
1702       return ret;
1703     }
1704   }
1705
1706   /* Sort by SELECT flag, if stream type is same. */
1707   flaga = gst_stream_get_stream_flags (sa);
1708   flagb = gst_stream_get_stream_flags (sb);
1709
1710   ret =
1711       (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1712       -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1713
1714   if (ret != 0) {
1715     GST_LOG ("Sort by SELECT flag: %d", ret);
1716     return ret;
1717   }
1718
1719   /* Sort by stream-id, if otherwise the same. */
1720   ida = gst_stream_get_stream_id (sa);
1721   idb = gst_stream_get_stream_id (sb);
1722   ret = g_strcmp0 (ida, idb);
1723
1724   GST_LOG ("Sort by stream-id: %d", ret);
1725
1726   return ret;
1727 }
1728
1729 /* Call with INPUT_LOCK taken */
1730 static GstStreamCollection *
1731 get_merged_collection (GstDecodebin3 * dbin)
1732 {
1733   gboolean needs_merge = FALSE;
1734   GstStreamCollection *res = NULL;
1735   GList *tmp;
1736   GList *unsorted_streams = NULL;
1737   guint i, nb_stream;
1738
1739   /* First check if we need to do a merge or just return the only collection */
1740   res = dbin->main_input->collection;
1741
1742   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1743     DecodebinInput *input = (DecodebinInput *) tmp->data;
1744     GST_LOG_OBJECT (dbin, "Comparing res %p input->collection %p", res,
1745         input->collection);
1746     if (input->collection && input->collection != res) {
1747       if (res) {
1748         needs_merge = TRUE;
1749         break;
1750       }
1751       res = input->collection;
1752     }
1753   }
1754
1755   if (!needs_merge) {
1756     GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1757     return res ? gst_object_ref (res) : NULL;
1758   }
1759
1760   /* We really need to create a new collection */
1761   /* FIXME : Some numbering scheme maybe ?? */
1762   res = gst_stream_collection_new ("decodebin3");
1763   if (dbin->main_input->collection) {
1764     nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1765     GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1766     for (i = 0; i < nb_stream; i++) {
1767       GstStream *stream =
1768           gst_stream_collection_get_stream (dbin->main_input->collection, i);
1769       unsorted_streams = g_list_append (unsorted_streams, stream);
1770     }
1771   }
1772
1773   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1774     DecodebinInput *input = (DecodebinInput *) tmp->data;
1775     GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1776         input->collection);
1777     if (input->collection) {
1778       nb_stream = gst_stream_collection_get_size (input->collection);
1779       GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1780       for (i = 0; i < nb_stream; i++) {
1781         GstStream *stream =
1782             gst_stream_collection_get_stream (input->collection, i);
1783         /* Only add if not already present in the list */
1784         if (!g_list_find (unsorted_streams, stream))
1785           unsorted_streams = g_list_append (unsorted_streams, stream);
1786       }
1787     }
1788   }
1789
1790   /* re-order streams : video, then audio, then others */
1791   unsorted_streams =
1792       g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1793   for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1794     GstStream *stream = (GstStream *) tmp->data;
1795     GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1796         gst_stream_get_stream_id (stream));
1797     gst_stream_collection_add_stream (res, gst_object_ref (stream));
1798   }
1799
1800   if (unsorted_streams)
1801     g_list_free (unsorted_streams);
1802
1803   return res;
1804 }
1805
1806 /* Call with INPUT_LOCK taken */
1807 static DecodebinInput *
1808 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1809 {
1810   DecodebinInput *input = NULL;
1811   GstElement *parent = gst_object_ref (child);
1812   GList *tmp;
1813
1814   do {
1815     GstElement *next_parent;
1816
1817     GST_DEBUG_OBJECT (dbin, "parent %s",
1818         parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1819
1820     if (parent == dbin->main_input->parsebin) {
1821       input = dbin->main_input;
1822       break;
1823     }
1824     for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1825       DecodebinInput *cur = (DecodebinInput *) tmp->data;
1826       if (parent == cur->parsebin) {
1827         input = cur;
1828         break;
1829       }
1830     }
1831     next_parent = (GstElement *) gst_element_get_parent (parent);
1832     gst_object_unref (parent);
1833     parent = next_parent;
1834
1835   } while (parent && parent != (GstElement *) dbin);
1836
1837   if (parent)
1838     gst_object_unref (parent);
1839
1840   return input;
1841 }
1842
1843 static const gchar *
1844 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1845 {
1846   guint i, len;
1847
1848   if (dbin->collection == NULL)
1849     return NULL;
1850   len = gst_stream_collection_get_size (dbin->collection);
1851   for (i = 0; i < len; i++) {
1852     GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1853     const gchar *osid = gst_stream_get_stream_id (stream);
1854     if (!g_strcmp0 (sid, osid))
1855       return osid;
1856   }
1857
1858   return NULL;
1859 }
1860
1861 /* Call with INPUT_LOCK taken */
1862 static void
1863 handle_stream_collection (GstDecodebin3 * dbin,
1864     GstStreamCollection * collection, DecodebinInput * input)
1865 {
1866 #ifndef GST_DISABLE_GST_DEBUG
1867   const gchar *upstream_id;
1868   guint i;
1869 #endif
1870   if (!input) {
1871     GST_DEBUG_OBJECT (dbin,
1872         "Couldn't find corresponding input, most likely shutting down");
1873     return;
1874   }
1875
1876   /* Replace collection in input */
1877   if (input->collection)
1878     gst_object_unref (input->collection);
1879   input->collection = gst_object_ref (collection);
1880   GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1881       input);
1882
1883   /* Merge collection if needed */
1884   collection = get_merged_collection (dbin);
1885
1886 #ifndef GST_DISABLE_GST_DEBUG
1887   /* Just some debugging */
1888   upstream_id = gst_stream_collection_get_upstream_id (collection);
1889   GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1890   GST_DEBUG ("From input %p", input);
1891   GST_DEBUG ("  %d streams", gst_stream_collection_get_size (collection));
1892   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1893     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1894     GstTagList *taglist;
1895     GstCaps *caps;
1896
1897     GST_DEBUG ("   Stream '%s'", gst_stream_get_stream_id (stream));
1898     GST_DEBUG ("     type  : %s",
1899         gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1900     GST_DEBUG ("     flags : 0x%x", gst_stream_get_stream_flags (stream));
1901     taglist = gst_stream_get_tags (stream);
1902     GST_DEBUG ("     tags  : %" GST_PTR_FORMAT, taglist);
1903     caps = gst_stream_get_caps (stream);
1904     GST_DEBUG ("     caps  : %" GST_PTR_FORMAT, caps);
1905     if (taglist)
1906       gst_tag_list_unref (taglist);
1907     if (caps)
1908       gst_caps_unref (caps);
1909   }
1910 #endif
1911
1912   /* Store collection for later usage */
1913   SELECTION_LOCK (dbin);
1914   if (dbin->collection == NULL) {
1915     dbin->collection = collection;
1916   } else {
1917     /* We need to check who emitted this collection (the owner).
1918      * If we already had a collection from that user, this one is an update,
1919      * that is to say that we need to figure out how we are going to re-use
1920      * the streams/slot */
1921     GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1922     /* FIXME : When do we switch from pending collection to active collection ?
1923      * When all streams from active collection are drained in multiqueue output ? */
1924     gst_object_unref (dbin->collection);
1925     dbin->collection = collection;
1926   }
1927   dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1928   SELECTION_UNLOCK (dbin);
1929 }
1930
1931 /* Must be called with the selection lock taken */
1932 static void
1933 gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
1934 {
1935   GstClockTime max_latency = GST_CLOCK_TIME_NONE;
1936   GList *tmp;
1937
1938   GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
1939   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1940     DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1941     if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
1942       if (max_latency == GST_CLOCK_TIME_NONE
1943           || out->decoder_latency > max_latency)
1944         max_latency = out->decoder_latency;
1945     }
1946   }
1947   GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
1948       GST_TIME_ARGS (max_latency));
1949
1950   if (!GST_CLOCK_TIME_IS_VALID (max_latency))
1951     return;
1952
1953   /* Make sure we keep an extra overhead */
1954   max_latency += 100 * GST_MSECOND;
1955   if (max_latency == dbin->current_mq_min_interleave)
1956     return;
1957
1958   dbin->current_mq_min_interleave = max_latency;
1959   GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
1960       GST_TIME_ARGS (dbin->current_mq_min_interleave));
1961   g_object_set (dbin->multiqueue, "min-interleave-time",
1962       dbin->current_mq_min_interleave, NULL);
1963 }
1964
1965 static void
1966 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1967 {
1968   GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1969   gboolean posting_collection = FALSE;
1970
1971   GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1972
1973   switch (GST_MESSAGE_TYPE (message)) {
1974     case GST_MESSAGE_STREAM_COLLECTION:
1975     {
1976       GstStreamCollection *collection = NULL;
1977       DecodebinInput *input;
1978
1979       INPUT_LOCK (dbin);
1980       input =
1981           find_message_parsebin (dbin,
1982           (GstElement *) GST_MESSAGE_SRC (message));
1983       if (input == NULL) {
1984         GST_DEBUG_OBJECT (dbin,
1985             "Couldn't find corresponding input, most likely shutting down");
1986         INPUT_UNLOCK (dbin);
1987         break;
1988       }
1989       if (input->upstream_selected) {
1990         GST_DEBUG_OBJECT (dbin,
1991             "Upstream handles selection, not using/forwarding collection");
1992         INPUT_UNLOCK (dbin);
1993         goto drop_message;
1994       }
1995       gst_message_parse_stream_collection (message, &collection);
1996       if (collection) {
1997         handle_stream_collection (dbin, collection, input);
1998         posting_collection = TRUE;
1999       }
2000       INPUT_UNLOCK (dbin);
2001
2002       SELECTION_LOCK (dbin);
2003       if (dbin->collection) {
2004         /* Replace collection message, we most likely aggregated it */
2005         GstMessage *new_msg;
2006         new_msg =
2007             gst_message_new_stream_collection ((GstObject *) dbin,
2008             dbin->collection);
2009         gst_message_unref (message);
2010         message = new_msg;
2011       }
2012       SELECTION_UNLOCK (dbin);
2013
2014       if (collection)
2015         gst_object_unref (collection);
2016       break;
2017     }
2018     case GST_MESSAGE_LATENCY:
2019     {
2020       GList *tmp;
2021       /* Check if this is from one of our decoders */
2022       SELECTION_LOCK (dbin);
2023       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2024         DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
2025         if (out->decoder == (GstElement *) GST_MESSAGE_SRC (message)) {
2026           GstClockTime min, max;
2027           if (GST_IS_VIDEO_DECODER (out->decoder)) {
2028             gst_video_decoder_get_latency (GST_VIDEO_DECODER (out->decoder),
2029                 &min, &max);
2030             GST_DEBUG_OBJECT (dbin,
2031                 "Got latency update from one of our decoders. min: %"
2032                 GST_TIME_FORMAT " max: %" GST_TIME_FORMAT, GST_TIME_ARGS (min),
2033                 GST_TIME_ARGS (max));
2034             out->decoder_latency = min;
2035             /* Trigger recalculation */
2036             gst_decodebin3_update_min_interleave (dbin);
2037           }
2038           break;
2039         }
2040       }
2041       SELECTION_UNLOCK (dbin);
2042     }
2043     default:
2044       break;
2045   }
2046
2047   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2048
2049   if (posting_collection) {
2050     /* Figure out a selection for that collection */
2051     update_requested_selection (dbin);
2052   }
2053
2054   return;
2055
2056 drop_message:
2057   {
2058     GST_DEBUG_OBJECT (bin, "dropping message");
2059     gst_message_unref (message);
2060   }
2061 }
2062
2063 static DecodebinOutputStream *
2064 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
2065 {
2066   GList *tmp;
2067   GstStreamType stype = gst_stream_get_stream_type (stream);
2068
2069   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2070     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2071     if (output->type == stype && output->slot && output->slot->active_stream) {
2072       GstStream *tstream = output->slot->active_stream;
2073       if (!stream_in_list (dbin->requested_selection,
2074               (gchar *) gst_stream_get_stream_id (tstream))) {
2075         return output;
2076       }
2077     }
2078   }
2079
2080   return NULL;
2081 }
2082
2083 /* Give a certain slot, figure out if it should be linked to an
2084  * output stream
2085  * CALL WITH SELECTION LOCK TAKEN !*/
2086 static DecodebinOutputStream *
2087 get_output_for_slot (MultiQueueSlot * slot)
2088 {
2089   GstDecodebin3 *dbin = slot->dbin;
2090   DecodebinOutputStream *output = NULL;
2091   const gchar *stream_id;
2092   GstCaps *caps;
2093   gchar *id_in_list = NULL;
2094
2095   /* If we already have a configured output, just use it */
2096   if (slot->output != NULL)
2097     return slot->output;
2098
2099   /*
2100    * FIXME
2101    *
2102    * This method needs to be split into multiple parts
2103    *
2104    * 1) Figure out whether stream should be exposed or not
2105    *   This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
2106    *   in the default stream attribution
2107    *
2108    * 2) Figure out whether an output stream should be created, whether
2109    *   we can re-use the output stream already linked to the slot, or
2110    *   whether we need to get re-assigned another (currently used) output
2111    *   stream.
2112    */
2113
2114   stream_id = gst_stream_get_stream_id (slot->active_stream);
2115   caps = gst_stream_get_caps (slot->active_stream);
2116   GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
2117   gst_caps_unref (caps);
2118
2119   /* 0. Emit autoplug-continue signal for pending caps ? */
2120   GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
2121
2122   /* 1. if in EXPOSE_ALL_MODE, just accept */
2123   GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
2124
2125 #if 0
2126   /* FIXME : The idea around this was to avoid activating a stream for
2127    *     which we have no decoder. Unfortunately it is way too
2128    *     expensive. Need to figure out a better solution */
2129   /* 2. Is there a potential decoder (if one is required) */
2130   if (!gst_caps_can_intersect (caps, dbin->caps)
2131       && !have_factory (dbin, (GstCaps *) caps,
2132           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
2133     GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
2134         caps);
2135     SELECTION_UNLOCK (dbin);
2136     gst_element_post_message (GST_ELEMENT_CAST (dbin),
2137         gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2138     SELECTION_LOCK (dbin);
2139     return NULL;
2140   }
2141 #endif
2142
2143   /* 3. In default mode check if we should expose */
2144   id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
2145   if (id_in_list || dbin->upstream_selected) {
2146     /* Check if we can steal an existing output stream we could re-use.
2147      * that is:
2148      * * an output stream whose slot->stream is not in requested
2149      * * and is of the same type as this stream
2150      */
2151     output = find_free_compatible_output (dbin, slot->active_stream);
2152     if (output) {
2153       /* Move this output from its current slot to this slot */
2154       dbin->to_activate =
2155           g_list_append (dbin->to_activate, (gchar *) stream_id);
2156       dbin->requested_selection =
2157           g_list_remove (dbin->requested_selection, id_in_list);
2158       g_free (id_in_list);
2159       SELECTION_UNLOCK (dbin);
2160       gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2161           (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
2162       SELECTION_LOCK (dbin);
2163       return NULL;
2164     }
2165
2166     output = create_output_stream (dbin, slot->type);
2167     output->slot = slot;
2168     GST_DEBUG ("Linking slot %p to new output %p", slot, output);
2169     slot->output = output;
2170     GST_DEBUG ("Adding '%s' to active_selection", stream_id);
2171     dbin->active_selection =
2172         g_list_append (dbin->active_selection, (gchar *) g_strdup (stream_id));
2173   } else
2174     GST_DEBUG ("Not creating any output for slot %p", slot);
2175
2176   return output;
2177 }
2178
2179 /* Returns SELECTED_STREAMS message if active_selection is equal to
2180  * requested_selection, else NULL.
2181  * Must be called with LOCK taken */
2182 static GstMessage *
2183 is_selection_done (GstDecodebin3 * dbin)
2184 {
2185   GList *tmp;
2186   GstMessage *msg;
2187
2188   if (!dbin->selection_updated)
2189     return NULL;
2190
2191   GST_LOG_OBJECT (dbin, "Checking");
2192
2193   if (dbin->to_activate != NULL) {
2194     GST_DEBUG ("Still have streams to activate");
2195     return NULL;
2196   }
2197   for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
2198     GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
2199     if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
2200       GST_DEBUG ("Not in active selection, returning");
2201       return NULL;
2202     }
2203   }
2204
2205   GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
2206
2207   /* We are completely active */
2208   msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
2209   if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) {
2210     gst_message_set_seqnum (msg, dbin->select_streams_seqnum);
2211   }
2212   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2213     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2214     if (output->slot) {
2215       const gchar *output_streamid =
2216           gst_stream_get_stream_id (output->slot->active_stream);
2217       GST_DEBUG_OBJECT (dbin, "Adding stream %s", output_streamid);
2218       if (stream_in_list (dbin->requested_selection, output_streamid))
2219         gst_message_streams_selected_add (msg, output->slot->active_stream);
2220       else
2221         GST_WARNING_OBJECT (dbin,
2222             "Output slot still active for old selection ?");
2223     } else
2224       GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
2225   }
2226   dbin->selection_updated = FALSE;
2227   return msg;
2228 }
2229
2230 /* Must be called with SELECTION_LOCK taken */
2231 static void
2232 check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev)
2233 {
2234   gboolean all_drained = TRUE;
2235   GList *iter;
2236
2237   GST_DEBUG_OBJECT (dbin, "check slot for eos");
2238
2239   for (iter = dbin->slots; iter; iter = iter->next) {
2240     MultiQueueSlot *slot = iter->data;
2241
2242     if (!slot->output)
2243       continue;
2244
2245     if (slot->is_drained) {
2246       GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
2247       continue;
2248     }
2249
2250     all_drained = FALSE;
2251     break;
2252   }
2253
2254   /* Also check with the inputs, data might be pending */
2255   if (all_drained)
2256     all_drained = all_inputs_are_eos (dbin);
2257
2258   if (all_drained) {
2259     GST_DEBUG_OBJECT (dbin,
2260         "All active slots are drained, and no pending input, push EOS");
2261
2262     for (iter = dbin->input_streams; iter; iter = iter->next) {
2263       DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
2264       GstPad *peer = gst_pad_get_peer (input->srcpad);
2265
2266       /* Send EOS to all slots */
2267       if (peer) {
2268         GstEvent *stream_start, *eos;
2269
2270         stream_start =
2271             gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
2272
2273         /* First forward a custom STREAM_START event to reset the EOS status (if any) */
2274         if (stream_start) {
2275           GstStructure *s;
2276           GstEvent *custom_stream_start = gst_event_copy (stream_start);
2277           gst_event_unref (stream_start);
2278           s = (GstStructure *) gst_event_get_structure (custom_stream_start);
2279           gst_structure_set (s, "decodebin3-flushing-stream-start",
2280               G_TYPE_BOOLEAN, TRUE, NULL);
2281           gst_pad_send_event (peer, custom_stream_start);
2282         }
2283
2284         eos = gst_event_new_eos ();
2285         gst_event_set_seqnum (eos, gst_event_get_seqnum (ev));
2286         gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
2287             CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
2288             NULL);
2289         gst_pad_send_event (peer, eos);
2290         gst_object_unref (peer);
2291       } else
2292         GST_DEBUG_OBJECT (dbin, "no output");
2293     }
2294   }
2295 }
2296
2297 static void
2298 check_slot_reconfiguration (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2299 {
2300   DecodebinOutputStream *output;
2301
2302   SELECTION_LOCK (dbin);
2303   output = get_output_for_slot (slot);
2304   if (!output) {
2305     SELECTION_UNLOCK (dbin);
2306     return;
2307   }
2308
2309   if (!reconfigure_output_stream (output, slot)) {
2310     GST_DEBUG_OBJECT (dbin, "Removing failing stream from selection: %s ",
2311         gst_stream_get_stream_id (slot->active_stream));
2312     slot->dbin->requested_selection =
2313         remove_from_list (slot->dbin->requested_selection,
2314         gst_stream_get_stream_id (slot->active_stream));
2315     SELECTION_UNLOCK (dbin);
2316     reassign_slot (dbin, slot);
2317   } else {
2318     GstMessage *msg = is_selection_done (dbin);
2319     SELECTION_UNLOCK (dbin);
2320     if (msg)
2321       gst_element_post_message ((GstElement *) slot->dbin, msg);
2322   }
2323 }
2324
2325 static GstPadProbeReturn
2326 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
2327     MultiQueueSlot * slot)
2328 {
2329   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2330   GstDecodebin3 *dbin = slot->dbin;
2331
2332   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
2333     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2334
2335     GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
2336     switch (GST_EVENT_TYPE (ev)) {
2337       case GST_EVENT_STREAM_START:
2338       {
2339         GstStream *stream = NULL;
2340         const GstStructure *s = gst_event_get_structure (ev);
2341
2342         /* Drop STREAM_START events used to cleanup multiqueue */
2343         if (s
2344             && gst_structure_has_field (s,
2345                 "decodebin3-flushing-stream-start")) {
2346           ret = GST_PAD_PROBE_HANDLED;
2347           gst_event_unref (ev);
2348           break;
2349         }
2350
2351         gst_event_parse_stream (ev, &stream);
2352         if (stream == NULL) {
2353           GST_ERROR_OBJECT (pad,
2354               "Got a STREAM_START event without a GstStream");
2355           break;
2356         }
2357         slot->is_drained = FALSE;
2358         GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
2359             gst_stream_get_stream_id (stream));
2360         if (slot->active_stream == NULL) {
2361           slot->active_stream = stream;
2362         } else if (slot->active_stream != stream) {
2363           GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
2364               gst_stream_get_stream_id (slot->active_stream),
2365               gst_stream_get_stream_id (stream));
2366           gst_object_unref (slot->active_stream);
2367           slot->active_stream = stream;
2368         } else
2369           gst_object_unref (stream);
2370 #if 0                           /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
2371         {
2372           gboolean is_active, is_requested;
2373           /* Quick check to see if we're in the current selection */
2374           /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
2375           SELECTION_LOCK (dbin);
2376           GST_DEBUG_OBJECT (dbin, "Checking active selection");
2377           is_active = stream_in_list (dbin->active_selection, stream_id);
2378           GST_DEBUG_OBJECT (dbin, "Checking requested selection");
2379           is_requested = stream_in_list (dbin->requested_selection, stream_id);
2380           SELECTION_UNLOCK (dbin);
2381           if (is_active)
2382             GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
2383                 slot->output);
2384           if (is_requested)
2385             GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
2386                 slot->output);
2387           else if (slot->output) {
2388             GST_DEBUG_OBJECT (pad,
2389                 "Slot needs to be deactivated ? It's no longer in requested selection");
2390           } else if (!is_active)
2391             GST_DEBUG_OBJECT (pad,
2392                 "Slot in neither active nor requested selection");
2393         }
2394 #endif
2395       }
2396         break;
2397       case GST_EVENT_CAPS:
2398       {
2399         /* Configure the output slot if needed */
2400         check_slot_reconfiguration (dbin, slot);
2401       }
2402         break;
2403       case GST_EVENT_EOS:
2404       {
2405         gboolean was_drained = slot->is_drained;
2406         slot->is_drained = TRUE;
2407
2408         /* Custom EOS handling first */
2409         if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2410                 CUSTOM_EOS_QUARK)) {
2411           /* remove custom-eos */
2412           ev = gst_event_make_writable (ev);
2413           GST_PAD_PROBE_INFO_DATA (info) = ev;
2414           gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
2415               CUSTOM_EOS_QUARK, NULL, NULL);
2416
2417           GST_LOG_OBJECT (pad, "Received custom EOS");
2418           ret = GST_PAD_PROBE_HANDLED;
2419           SELECTION_LOCK (dbin);
2420           if (slot->input == NULL) {
2421             GST_DEBUG_OBJECT (pad,
2422                 "Got custom-eos from null input stream, remove output stream");
2423             /* Remove the output */
2424             if (slot->output) {
2425               DecodebinOutputStream *output = slot->output;
2426               dbin->output_streams =
2427                   g_list_remove (dbin->output_streams, output);
2428               free_output_stream (dbin, output);
2429               /* Reacalculate min interleave */
2430               gst_decodebin3_update_min_interleave (dbin);
2431             }
2432             slot->probe_id = 0;
2433             dbin->slots = g_list_remove (dbin->slots, slot);
2434             free_multiqueue_slot_async (dbin, slot);
2435             ret = GST_PAD_PROBE_REMOVE;
2436           } else if (!was_drained) {
2437             check_all_slot_for_eos (dbin, ev);
2438           }
2439           if (ret == GST_PAD_PROBE_HANDLED)
2440             gst_event_unref (ev);
2441           SELECTION_UNLOCK (dbin);
2442           break;
2443         }
2444
2445         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
2446             slot->input);
2447         if (slot->input == NULL) {
2448           GstPad *peer;
2449           GST_DEBUG_OBJECT (pad,
2450               "last EOS for input, forwarding and removing slot");
2451           peer = gst_pad_get_peer (pad);
2452           if (peer) {
2453             gst_pad_send_event (peer, gst_event_ref (ev));
2454             gst_object_unref (peer);
2455           }
2456           SELECTION_LOCK (dbin);
2457           /* FIXME : Shouldn't we try to re-assign the output instead of just
2458            * removing it ? */
2459           /* Remove the output */
2460           if (slot->output) {
2461             DecodebinOutputStream *output = slot->output;
2462             dbin->output_streams = g_list_remove (dbin->output_streams, output);
2463             free_output_stream (dbin, output);
2464           }
2465           slot->probe_id = 0;
2466           dbin->slots = g_list_remove (dbin->slots, slot);
2467           SELECTION_UNLOCK (dbin);
2468
2469           /* FIXME: Removing the slot is async, which means actually
2470            * unlinking the pad is async. Other things like stream-start
2471            * might flow through this (now unprobed) link before it actually
2472            * gets released */
2473           free_multiqueue_slot_async (dbin, slot);
2474           ret = GST_PAD_PROBE_REMOVE;
2475         } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
2476                 CUSTOM_FINAL_EOS_QUARK)) {
2477           GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
2478         } else {
2479           GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
2480           /* drop current event as eos will be sent in check_all_slot_for_eos
2481            * when all output streams are also eos */
2482           ret = GST_PAD_PROBE_DROP;
2483           SELECTION_LOCK (dbin);
2484           check_all_slot_for_eos (dbin, ev);
2485           SELECTION_UNLOCK (dbin);
2486         }
2487       }
2488         break;
2489       default:
2490         break;
2491     }
2492   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
2493     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
2494     switch (GST_QUERY_TYPE (query)) {
2495       case GST_QUERY_CAPS:
2496       {
2497         GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
2498         gst_query_set_caps_result (query, GST_CAPS_ANY);
2499         ret = GST_PAD_PROBE_HANDLED;
2500       }
2501         break;
2502
2503       case GST_QUERY_ACCEPT_CAPS:
2504       {
2505         GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
2506         /* If the current decoder doesn't accept caps, we'll reconfigure
2507          * on the actual caps event. So accept any caps. */
2508         gst_query_set_accept_caps_result (query, TRUE);
2509         ret = GST_PAD_PROBE_HANDLED;
2510       }
2511       default:
2512         break;
2513     }
2514   }
2515
2516   return ret;
2517 }
2518
2519 /* Create a new multiqueue slot for the given type
2520  *
2521  * It is up to the caller to know whether that slot is needed or not
2522  * (and release it when no longer needed) */
2523 static MultiQueueSlot *
2524 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
2525 {
2526   MultiQueueSlot *slot;
2527   GstIterator *it = NULL;
2528   GValue item = { 0, };
2529
2530   GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
2531       gst_stream_type_get_name (type));
2532   slot = g_new0 (MultiQueueSlot, 1);
2533   slot->dbin = dbin;
2534
2535   slot->id = dbin->slot_id++;
2536
2537   slot->type = type;
2538   slot->sink_pad = gst_element_request_pad_simple (dbin->multiqueue, "sink_%u");
2539   if (slot->sink_pad == NULL)
2540     goto fail;
2541
2542   it = gst_pad_iterate_internal_links (slot->sink_pad);
2543   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
2544       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
2545     GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
2546         GST_DEBUG_PAD_NAME (slot->src_pad));
2547     goto fail;
2548   }
2549   gst_iterator_free (it);
2550   g_value_reset (&item);
2551
2552   g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
2553
2554   /* Add event probe */
2555   slot->probe_id =
2556       gst_pad_add_probe (slot->src_pad,
2557       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2558       (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
2559
2560   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
2561       GST_DEBUG_PAD_NAME (slot->src_pad));
2562
2563   dbin->slots = g_list_append (dbin->slots, slot);
2564
2565   return slot;
2566
2567   /* ERRORS */
2568 fail:
2569   {
2570     if (slot->sink_pad)
2571       gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2572     g_free (slot);
2573     return NULL;
2574   }
2575 }
2576
2577 /* Must be called with SELECTION_LOCK */
2578 static MultiQueueSlot *
2579 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
2580 {
2581   GList *tmp;
2582   MultiQueueSlot *empty_slot = NULL;
2583   GstStreamType input_type = 0;
2584   gchar *stream_id = NULL;
2585
2586   GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
2587       input, input->active_stream,
2588       input->
2589       active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
2590
2591   if (input->active_stream) {
2592     input_type = gst_stream_get_stream_type (input->active_stream);
2593     stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
2594   }
2595
2596   /* Go over existing slots and check if there is already one for it */
2597   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2598     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2599     /* Already used input, return that one */
2600     if (slot->input == input) {
2601       GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
2602       return slot;
2603     }
2604   }
2605
2606   /* Go amongst all unused slots of the right type and try to find a candidate */
2607   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2608     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2609     if (slot->input == NULL && input_type == slot->type) {
2610       /* Remember this empty slot for later */
2611       empty_slot = slot;
2612       /* Check if available slot is of the same stream_id */
2613       GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2614           slot->id, slot->active_stream);
2615       if (stream_id && slot->active_stream) {
2616         gchar *ostream_id =
2617             (gchar *) gst_stream_get_stream_id (slot->active_stream);
2618         GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2619             ostream_id, stream_id);
2620         if (!g_strcmp0 (stream_id, ostream_id))
2621           break;
2622       }
2623     }
2624   }
2625
2626   if (empty_slot) {
2627     GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2628     empty_slot->input = input;
2629     return empty_slot;
2630   }
2631
2632   if (input_type)
2633     return create_new_slot (dbin, input_type);
2634
2635   return NULL;
2636 }
2637
2638 static void
2639 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2640 {
2641   if (slot->input != NULL && slot->input != input) {
2642     GST_ERROR_OBJECT (slot->dbin,
2643         "Trying to link input to an already used slot");
2644     return;
2645   }
2646   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2647   slot->pending_stream = input->active_stream;
2648   slot->input = input;
2649 }
2650
2651 #if 0
2652 static gboolean
2653 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2654     GstElementFactoryListType ftype)
2655 {
2656   gboolean ret = FALSE;
2657   GList *res;
2658
2659   g_mutex_lock (&dbin->factories_lock);
2660   gst_decode_bin_update_factories_list (dbin);
2661   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2662     res =
2663         gst_element_factory_list_filter (dbin->decoder_factories,
2664         caps, GST_PAD_SINK, TRUE);
2665   else
2666     res =
2667         gst_element_factory_list_filter (dbin->decodable_factories,
2668         caps, GST_PAD_SINK, TRUE);
2669   g_mutex_unlock (&dbin->factories_lock);
2670
2671   if (res) {
2672     ret = TRUE;
2673     gst_plugin_feature_list_free (res);
2674   }
2675
2676   return ret;
2677 }
2678 #endif
2679
2680 static GList *
2681 create_decoder_factory_list (GstDecodebin3 * dbin, GstCaps * caps)
2682 {
2683   GList *res;
2684
2685   g_mutex_lock (&dbin->factories_lock);
2686   gst_decode_bin_update_factories_list (dbin);
2687   res = gst_element_factory_list_filter (dbin->decoder_factories,
2688       caps, GST_PAD_SINK, TRUE);
2689   g_mutex_unlock (&dbin->factories_lock);
2690   return res;
2691 }
2692
2693 static GstPadProbeReturn
2694 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2695     DecodebinOutputStream * output)
2696 {
2697   GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2698
2699   /* If we have a keyframe, remove the probe and let all data through */
2700   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2701       GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2702     GST_DEBUG_OBJECT (pad,
2703         "Buffer is keyframe or header, letting through and removing probe");
2704     output->drop_probe_id = 0;
2705     return GST_PAD_PROBE_REMOVE;
2706   }
2707   GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2708   return GST_PAD_PROBE_DROP;
2709 }
2710
2711 /* Returns FALSE if the output couldn't be properly configured and the
2712  * associated GstStreams should be disabled */
2713 static gboolean
2714 reconfigure_output_stream (DecodebinOutputStream * output,
2715     MultiQueueSlot * slot)
2716 {
2717   GstDecodebin3 *dbin = output->dbin;
2718   GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2719   gboolean needs_decoder;
2720   gboolean ret = TRUE;
2721
2722   needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2723
2724   GST_DEBUG_OBJECT (dbin,
2725       "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2726       needs_decoder);
2727
2728   /* FIXME : Maybe make the output un-hook itself automatically ? */
2729   if (output->slot != NULL && output->slot != slot) {
2730     GST_WARNING_OBJECT (dbin,
2731         "Output still linked to another slot (%p)", output->slot);
2732     gst_caps_unref (new_caps);
2733     return ret;
2734   }
2735
2736   /* Check if existing config is reusable as-is by checking if
2737    * the existing decoder accepts the new caps, if not delete
2738    * it and create a new one */
2739   if (output->decoder) {
2740     gboolean can_reuse_decoder;
2741
2742     if (needs_decoder) {
2743       can_reuse_decoder =
2744           gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2745     } else
2746       can_reuse_decoder = FALSE;
2747
2748     if (can_reuse_decoder) {
2749       if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2750         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2751         output->drop_probe_id =
2752             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2753             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2754       }
2755       GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2756       if (output->linked == FALSE) {
2757         gst_pad_link_full (slot->src_pad, output->decoder_sink,
2758             GST_PAD_LINK_CHECK_NOTHING);
2759         output->linked = TRUE;
2760       }
2761       gst_caps_unref (new_caps);
2762       return ret;
2763     }
2764
2765     GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2766
2767     if (output->linked)
2768       gst_pad_unlink (slot->src_pad, output->decoder_sink);
2769     output->linked = FALSE;
2770     if (output->drop_probe_id) {
2771       gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2772       output->drop_probe_id = 0;
2773     }
2774
2775     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2776       GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2777       gst_caps_unref (new_caps);
2778       goto cleanup;
2779     }
2780
2781     gst_element_set_locked_state (output->decoder, TRUE);
2782     gst_element_set_state (output->decoder, GST_STATE_NULL);
2783
2784     gst_bin_remove ((GstBin *) dbin, output->decoder);
2785     output->decoder = NULL;
2786     output->decoder_latency = GST_CLOCK_TIME_NONE;
2787   } else if (output->linked) {
2788     /* Otherwise if we have no decoder yet but the output is linked make
2789      * sure that the ghost pad is really unlinked in case no decoder was
2790      * needed previously */
2791     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2792       GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
2793       gst_caps_unref (new_caps);
2794       goto cleanup;
2795     }
2796   }
2797
2798   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2799   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2800
2801   /* If a decoder is required, create one */
2802   if (needs_decoder) {
2803     GList *factories, *next_factory;
2804
2805     factories = next_factory = create_decoder_factory_list (dbin, new_caps);
2806     while (!output->decoder) {
2807       gboolean decoder_failed = FALSE;
2808
2809       /* If we don't have a decoder yet, instantiate one */
2810       if (next_factory) {
2811         output->decoder = gst_element_factory_create ((GstElementFactory *)
2812             next_factory->data, NULL);
2813         GST_DEBUG ("Created decoder '%s'", GST_ELEMENT_NAME (output->decoder));
2814       } else {
2815         GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
2816             new_caps);
2817         g_assert (output->decoder == NULL);
2818       }
2819
2820       if (output->decoder == NULL) {
2821         GstCaps *caps;
2822
2823         SELECTION_UNLOCK (dbin);
2824         /* FIXME : Should we be smarter if there's a missing decoder ?
2825          * Should we deactivate that stream ? */
2826         caps = gst_stream_get_caps (slot->active_stream);
2827         gst_element_post_message (GST_ELEMENT_CAST (dbin),
2828             gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2829         gst_caps_unref (caps);
2830         SELECTION_LOCK (dbin);
2831         ret = FALSE;
2832         goto cleanup;
2833       }
2834       if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2835         GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2836         ret = FALSE;
2837         goto cleanup;
2838       }
2839       output->decoder_sink =
2840           gst_element_get_static_pad (output->decoder, "sink");
2841       output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2842       if (output->type & GST_STREAM_TYPE_VIDEO) {
2843         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2844         output->drop_probe_id =
2845             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2846             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2847       }
2848       if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2849               GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2850         GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2851             GST_DEBUG_PAD_NAME (output->decoder_sink));
2852         ret = FALSE;
2853         goto cleanup;
2854       }
2855       if (gst_element_set_state (output->decoder,
2856               GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
2857         GST_DEBUG_OBJECT (dbin,
2858             "Decoder '%s' failed to reach READY state, trying the next type",
2859             GST_ELEMENT_NAME (output->decoder));
2860         decoder_failed = TRUE;
2861       }
2862       if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
2863         GST_DEBUG_OBJECT (dbin,
2864             "Decoder '%s' did not accept the caps, trying the next type",
2865             GST_ELEMENT_NAME (output->decoder));
2866         decoder_failed = TRUE;
2867       }
2868       if (decoder_failed) {
2869         gst_pad_unlink (slot->src_pad, output->decoder_sink);
2870         if (output->drop_probe_id) {
2871           gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2872           output->drop_probe_id = 0;
2873         }
2874
2875         gst_element_set_locked_state (output->decoder, TRUE);
2876         gst_element_set_state (output->decoder, GST_STATE_NULL);
2877
2878         gst_bin_remove ((GstBin *) dbin, output->decoder);
2879         output->decoder = NULL;
2880       }
2881       next_factory = next_factory->next;
2882     }
2883     gst_plugin_feature_list_free (factories);
2884   } else {
2885     output->decoder_src = gst_object_ref (slot->src_pad);
2886     output->decoder_sink = NULL;
2887   }
2888   gst_caps_unref (new_caps);
2889
2890   output->linked = TRUE;
2891   if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2892           output->decoder_src)) {
2893     GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2894     ret = FALSE;
2895     goto cleanup;
2896   }
2897   if (output->src_exposed == FALSE) {
2898     GstEvent *stream_start;
2899
2900     stream_start = gst_pad_get_sticky_event (slot->src_pad,
2901         GST_EVENT_STREAM_START, 0);
2902
2903     /* Ensure GstStream is accesiable from pad-added callback */
2904     if (stream_start) {
2905       gst_pad_store_sticky_event (output->src_pad, stream_start);
2906       gst_event_unref (stream_start);
2907     } else {
2908       GST_WARNING_OBJECT (slot->src_pad,
2909           "Pad has no stored stream-start event");
2910     }
2911
2912     output->src_exposed = TRUE;
2913     gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2914   }
2915
2916   if (output->decoder)
2917     gst_element_sync_state_with_parent (output->decoder);
2918
2919   output->slot = slot;
2920   return ret;
2921
2922 cleanup:
2923   {
2924     GST_DEBUG_OBJECT (dbin, "Cleanup");
2925     if (output->decoder_sink) {
2926       gst_object_unref (output->decoder_sink);
2927       output->decoder_sink = NULL;
2928     }
2929     if (output->decoder_src) {
2930       gst_object_unref (output->decoder_src);
2931       output->decoder_src = NULL;
2932     }
2933     if (output->decoder) {
2934       gst_element_set_state (output->decoder, GST_STATE_NULL);
2935       gst_bin_remove ((GstBin *) dbin, output->decoder);
2936       output->decoder = NULL;
2937     }
2938     return ret;
2939   }
2940 }
2941
2942 static GstPadProbeReturn
2943 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2944 {
2945   GstDecodebin3 *dbin = slot->dbin;
2946
2947   check_slot_reconfiguration (dbin, slot);
2948
2949   return GST_PAD_PROBE_REMOVE;
2950 }
2951
2952 static MultiQueueSlot *
2953 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2954 {
2955   GList *tmp;
2956
2957   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2958     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2959     const gchar *stream_id;
2960     if (slot->active_stream) {
2961       stream_id = gst_stream_get_stream_id (slot->active_stream);
2962       if (!g_strcmp0 (sid, stream_id))
2963         return slot;
2964     }
2965     if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2966       stream_id = gst_stream_get_stream_id (slot->pending_stream);
2967       if (!g_strcmp0 (sid, stream_id))
2968         return slot;
2969     }
2970   }
2971
2972   return NULL;
2973 }
2974
2975 /* This function handles the reassignment of a slot. Call this from
2976  * the streaming thread of a slot. */
2977 static gboolean
2978 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2979 {
2980   DecodebinOutputStream *output;
2981   MultiQueueSlot *target_slot = NULL;
2982   GList *tmp;
2983   const gchar *sid, *tsid;
2984
2985   SELECTION_LOCK (dbin);
2986   output = slot->output;
2987
2988   if (G_UNLIKELY (slot->active_stream == NULL)) {
2989     GST_DEBUG_OBJECT (slot->src_pad,
2990         "Called on inactive slot (active_stream == NULL)");
2991     SELECTION_UNLOCK (dbin);
2992     return FALSE;
2993   }
2994
2995   if (G_UNLIKELY (output == NULL)) {
2996     GST_DEBUG_OBJECT (slot->src_pad,
2997         "Slot doesn't have any output to be removed");
2998     SELECTION_UNLOCK (dbin);
2999     return FALSE;
3000   }
3001
3002   sid = gst_stream_get_stream_id (slot->active_stream);
3003   GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
3004
3005   /* Recheck whether this stream is still in the list of streams to deactivate */
3006   if (stream_in_list (dbin->requested_selection, sid)) {
3007     /* Stream is in the list of requested streams, don't remove */
3008     SELECTION_UNLOCK (dbin);
3009     GST_DEBUG_OBJECT (slot->src_pad,
3010         "Stream '%s' doesn't need to be deactivated", sid);
3011     return FALSE;
3012   }
3013
3014   /* Unlink slot from output */
3015   /* FIXME : Handle flushing ? */
3016   /* FIXME : Handle outputs without decoders */
3017   GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
3018       output->decoder_sink);
3019   if (output->decoder_sink)
3020     gst_pad_unlink (slot->src_pad, output->decoder_sink);
3021   output->linked = FALSE;
3022   slot->output = NULL;
3023   output->slot = NULL;
3024   /* Remove sid from active selection */
3025   GST_DEBUG ("Removing '%s' from active_selection", sid);
3026   for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
3027     if (!g_strcmp0 (sid, tmp->data)) {
3028       g_free (tmp->data);
3029       dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
3030       break;
3031     }
3032
3033   /* Can we re-assign this output to a requested stream ? */
3034   GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
3035   for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
3036     MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
3037     GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
3038         tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
3039     if (tslot && tslot->type == output->type && tslot->output == NULL) {
3040       GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
3041       target_slot = tslot;
3042       tsid = tmp->data;
3043       /* Pass target stream id to requested selection */
3044       dbin->requested_selection =
3045           g_list_append (dbin->requested_selection, g_strdup (tmp->data));
3046       dbin->to_activate = g_list_delete_link (dbin->to_activate, tmp);
3047       break;
3048     }
3049   }
3050
3051   if (target_slot) {
3052     GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
3053         target_slot, tsid);
3054     target_slot->output = output;
3055     output->slot = target_slot;
3056     GST_DEBUG ("Adding '%s' to active_selection", tsid);
3057     dbin->active_selection =
3058         g_list_append (dbin->active_selection, (gchar *) g_strdup (tsid));
3059     SELECTION_UNLOCK (dbin);
3060
3061     /* Wakeup the target slot so that it retries to send events/buffers
3062      * thereby triggering the output reconfiguration codepath */
3063     gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3064         (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
3065     /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
3066   } else {
3067     GstMessage *msg;
3068
3069     dbin->output_streams = g_list_remove (dbin->output_streams, output);
3070     free_output_stream (dbin, output);
3071     msg = is_selection_done (slot->dbin);
3072     SELECTION_UNLOCK (dbin);
3073
3074     if (msg)
3075       gst_element_post_message ((GstElement *) slot->dbin, msg);
3076   }
3077
3078   return TRUE;
3079 }
3080
3081 /* Idle probe called when a slot should be unassigned from its output stream.
3082  * This is needed to ensure nothing is flowing when unlinking the slot.
3083  *
3084  * Also, this method will search for a pending stream which could re-use
3085  * the output stream. */
3086 static GstPadProbeReturn
3087 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
3088     MultiQueueSlot * slot)
3089 {
3090   GstDecodebin3 *dbin = slot->dbin;
3091
3092   reassign_slot (dbin, slot);
3093
3094   return GST_PAD_PROBE_REMOVE;
3095 }
3096
3097 static gboolean
3098 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
3099     guint32 seqnum)
3100 {
3101   gboolean ret = TRUE;
3102   GList *tmp;
3103   /* List of slots to (de)activate. */
3104   GList *to_deactivate = NULL;
3105   GList *to_activate = NULL;
3106   /* List of unknown stream id, most likely means the event
3107    * should be sent upstream so that elements can expose the requested stream */
3108   GList *unknown = NULL;
3109   GList *to_reassign = NULL;
3110   GList *future_request_streams = NULL;
3111   GList *pending_streams = NULL;
3112   GList *slots_to_reassign = NULL;
3113
3114   SELECTION_LOCK (dbin);
3115   if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
3116     GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
3117     SELECTION_UNLOCK (dbin);
3118     return TRUE;
3119   }
3120   /* Remove pending select_streams */
3121   g_list_free (dbin->pending_select_streams);
3122   dbin->pending_select_streams = NULL;
3123
3124   /* COMPARE the requested streams to the active and requested streams
3125    * on multiqueue. */
3126
3127   /* First check the slots to activate and which ones are unknown */
3128   for (tmp = select_streams; tmp; tmp = tmp->next) {
3129     const gchar *sid = (const gchar *) tmp->data;
3130     MultiQueueSlot *slot;
3131     GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
3132     slot = find_slot_for_stream_id (dbin, sid);
3133     /* Find the corresponding slot */
3134     if (slot == NULL) {
3135       if (stream_in_collection (dbin, (gchar *) sid)) {
3136         pending_streams = g_list_append (pending_streams, (gchar *) sid);
3137       } else {
3138         GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
3139         unknown = g_list_append (unknown, (gchar *) sid);
3140       }
3141     } else if (slot->output == NULL) {
3142       GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
3143           slot, sid);
3144       to_activate = g_list_append (to_activate, slot);
3145     } else {
3146       GST_DEBUG_OBJECT (dbin,
3147           "Stream '%s' from slot %p is already active on output %p", sid, slot,
3148           slot->output);
3149       future_request_streams =
3150           g_list_append (future_request_streams, (gchar *) sid);
3151     }
3152   }
3153
3154   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3155     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3156     /* For slots that have an output, check if it's part of the streams to
3157      * be active */
3158     if (slot->output) {
3159       gboolean slot_to_deactivate = TRUE;
3160
3161       if (slot->active_stream) {
3162         if (stream_in_list (select_streams,
3163                 gst_stream_get_stream_id (slot->active_stream)))
3164           slot_to_deactivate = FALSE;
3165       }
3166       if (slot_to_deactivate && slot->pending_stream
3167           && slot->pending_stream != slot->active_stream) {
3168         if (stream_in_list (select_streams,
3169                 gst_stream_get_stream_id (slot->pending_stream)))
3170           slot_to_deactivate = FALSE;
3171       }
3172       if (slot_to_deactivate) {
3173         GST_DEBUG_OBJECT (dbin,
3174             "Slot %p (%s) should be deactivated, no longer used", slot,
3175             slot->
3176             active_stream ? gst_stream_get_stream_id (slot->active_stream) :
3177             "NULL");
3178         to_deactivate = g_list_append (to_deactivate, slot);
3179       }
3180     }
3181   }
3182
3183   if (to_deactivate != NULL) {
3184     GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
3185     /* We need to compare what needs to be activated and deactivated in order
3186      * to determine whether there are outputs that can be transferred */
3187     /* Take the stream-id of the slots that are to be activated, for which there
3188      * is a slot of the same type that needs to be deactivated */
3189     tmp = to_deactivate;
3190     while (tmp) {
3191       MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
3192       gboolean removeit = FALSE;
3193       GList *tmp2, *next;
3194       GST_DEBUG_OBJECT (dbin,
3195           "Checking if slot to deactivate (%p) has a candidate slot to activate",
3196           slot_to_deactivate);
3197       for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
3198         MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
3199         GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
3200         if (slot_to_activate->type == slot_to_deactivate->type) {
3201           GST_DEBUG_OBJECT (dbin, "Re-using");
3202           to_reassign = g_list_append (to_reassign, (gchar *)
3203               gst_stream_get_stream_id (slot_to_activate->active_stream));
3204           slots_to_reassign =
3205               g_list_append (slots_to_reassign, slot_to_deactivate);
3206           to_activate = g_list_remove (to_activate, slot_to_activate);
3207           removeit = TRUE;
3208           break;
3209         }
3210       }
3211       next = tmp->next;
3212       if (removeit)
3213         to_deactivate = g_list_delete_link (to_deactivate, tmp);
3214       tmp = next;
3215     }
3216   }
3217
3218   for (tmp = to_deactivate; tmp; tmp = tmp->next) {
3219     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3220     GST_DEBUG_OBJECT (dbin,
3221         "Really need to deactivate slot %p, but no available alternative",
3222         slot);
3223
3224     slots_to_reassign = g_list_append (slots_to_reassign, slot);
3225   }
3226
3227   /* The only slots left to activate are the ones that won't be reassigned and
3228    * therefore really need to have a new output created */
3229   for (tmp = to_activate; tmp; tmp = tmp->next) {
3230     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3231     if (slot->active_stream)
3232       future_request_streams =
3233           g_list_append (future_request_streams,
3234           (gchar *) gst_stream_get_stream_id (slot->active_stream));
3235     else if (slot->pending_stream)
3236       future_request_streams =
3237           g_list_append (future_request_streams,
3238           (gchar *) gst_stream_get_stream_id (slot->pending_stream));
3239     else
3240       GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
3241   }
3242
3243   if (to_activate == NULL && pending_streams != NULL) {
3244     GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
3245     if (dbin->requested_selection)
3246       g_list_free_full (dbin->requested_selection, g_free);
3247     dbin->requested_selection =
3248         g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
3249     g_list_free (to_deactivate);
3250     g_list_free (pending_streams);
3251     to_deactivate = NULL;
3252     pending_streams = NULL;
3253   } else {
3254     if (dbin->requested_selection)
3255       g_list_free_full (dbin->requested_selection, g_free);
3256     dbin->requested_selection =
3257         g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
3258     dbin->requested_selection =
3259         g_list_concat (dbin->requested_selection,
3260         g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
3261     if (dbin->to_activate)
3262       g_list_free (dbin->to_activate);
3263     dbin->to_activate = g_list_copy (to_reassign);
3264   }
3265
3266   dbin->selection_updated = TRUE;
3267   SELECTION_UNLOCK (dbin);
3268
3269   if (unknown) {
3270     GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
3271     g_list_free (unknown);
3272   }
3273
3274   if (to_activate && !slots_to_reassign) {
3275     for (tmp = to_activate; tmp; tmp = tmp->next) {
3276       MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3277       gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3278           (GstPadProbeCallback) idle_reconfigure, slot, NULL);
3279     }
3280   }
3281
3282   /* For all streams to deactivate, add an idle probe where we will do
3283    * the unassignment and switch over */
3284   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
3285     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3286     gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
3287         (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
3288   }
3289
3290   if (to_deactivate)
3291     g_list_free (to_deactivate);
3292   if (to_activate)
3293     g_list_free (to_activate);
3294   if (to_reassign)
3295     g_list_free (to_reassign);
3296   if (future_request_streams)
3297     g_list_free (future_request_streams);
3298   if (pending_streams)
3299     g_list_free (pending_streams);
3300   if (slots_to_reassign)
3301     g_list_free (slots_to_reassign);
3302
3303   return ret;
3304 }
3305
3306 static GstPadProbeReturn
3307 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
3308     DecodebinOutputStream * output)
3309 {
3310   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
3311   GstDecodebin3 *dbin = output->dbin;
3312   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
3313
3314   GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
3315
3316   switch (GST_EVENT_TYPE (event)) {
3317     case GST_EVENT_SELECT_STREAMS:
3318     {
3319       GstPad *peer;
3320       GList *streams = NULL;
3321       guint32 seqnum = gst_event_get_seqnum (event);
3322
3323       if (dbin->upstream_selected) {
3324         GST_DEBUG_OBJECT (pad, "Letting select-streams event flow upstream");
3325         break;
3326       }
3327
3328       SELECTION_LOCK (dbin);
3329       if (seqnum == dbin->select_streams_seqnum) {
3330         SELECTION_UNLOCK (dbin);
3331         GST_DEBUG_OBJECT (pad,
3332             "Already handled/handling that SELECT_STREAMS event");
3333         gst_event_unref (event);
3334         ret = GST_PAD_PROBE_HANDLED;
3335         break;
3336       }
3337       dbin->select_streams_seqnum = seqnum;
3338       if (dbin->pending_select_streams != NULL) {
3339         GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3340         g_list_free (dbin->pending_select_streams);
3341         dbin->pending_select_streams = NULL;
3342       }
3343       gst_event_parse_select_streams (event, &streams);
3344       dbin->pending_select_streams = g_list_copy (streams);
3345       SELECTION_UNLOCK (dbin);
3346
3347       /* Send event upstream */
3348       if ((peer = gst_pad_get_peer (pad))) {
3349         gst_pad_send_event (peer, event);
3350         gst_object_unref (peer);
3351       } else {
3352         gst_event_unref (event);
3353       }
3354       /* Finally handle the switch */
3355       if (streams) {
3356         handle_stream_switch (dbin, streams, seqnum);
3357         g_list_free_full (streams, g_free);
3358       }
3359       ret = GST_PAD_PROBE_HANDLED;
3360     }
3361       break;
3362     default:
3363       break;
3364   }
3365
3366   return ret;
3367 }
3368
3369 static gboolean
3370 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
3371 {
3372   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3373
3374   GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
3375   if (!dbin->upstream_selected
3376       && GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
3377     GList *streams = NULL;
3378     guint32 seqnum = gst_event_get_seqnum (event);
3379
3380     SELECTION_LOCK (dbin);
3381     if (seqnum == dbin->select_streams_seqnum) {
3382       SELECTION_UNLOCK (dbin);
3383       GST_DEBUG_OBJECT (dbin,
3384           "Already handled/handling that SELECT_STREAMS event");
3385       return TRUE;
3386     }
3387     dbin->select_streams_seqnum = seqnum;
3388     if (dbin->pending_select_streams != NULL) {
3389       GST_LOG_OBJECT (dbin, "Replacing pending select streams");
3390       g_list_free (dbin->pending_select_streams);
3391       dbin->pending_select_streams = NULL;
3392     }
3393     gst_event_parse_select_streams (event, &streams);
3394     dbin->pending_select_streams = g_list_copy (streams);
3395     SELECTION_UNLOCK (dbin);
3396
3397 #if 0
3398     /* Send event upstream */
3399     if ((peer = gst_pad_get_peer (pad))) {
3400       gst_pad_send_event (peer, event);
3401       gst_object_unref (peer);
3402     }
3403 #endif
3404     /* Finally handle the switch */
3405     if (streams) {
3406       handle_stream_switch (dbin, streams, seqnum);
3407       g_list_free_full (streams, g_free);
3408     }
3409
3410     gst_event_unref (event);
3411     return TRUE;
3412   }
3413   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
3414 }
3415
3416
3417 static void
3418 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3419 {
3420   if (slot->probe_id)
3421     gst_pad_remove_probe (slot->src_pad, slot->probe_id);
3422   if (slot->input) {
3423     if (slot->input->srcpad)
3424       gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
3425   }
3426
3427   gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
3428   gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
3429   gst_object_replace ((GstObject **) & slot->src_pad, NULL);
3430   gst_object_replace ((GstObject **) & slot->active_stream, NULL);
3431   g_free (slot);
3432 }
3433
3434 static void
3435 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
3436 {
3437   GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
3438   gst_element_call_async (GST_ELEMENT_CAST (dbin),
3439       (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
3440 }
3441
3442 /* Create a DecodebinOutputStream for a given type
3443  * Note: It will be empty initially, it needs to be configured
3444  * afterwards */
3445 static DecodebinOutputStream *
3446 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
3447 {
3448   DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
3449   gchar *pad_name;
3450   const gchar *prefix;
3451   GstStaticPadTemplate *templ;
3452   GstPadTemplate *ptmpl;
3453   guint32 *counter;
3454   GstPad *internal_pad;
3455
3456   GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
3457       res, gst_stream_type_get_name (type));
3458
3459   res->type = type;
3460   res->dbin = dbin;
3461   res->decoder_latency = GST_CLOCK_TIME_NONE;
3462
3463   if (type & GST_STREAM_TYPE_VIDEO) {
3464     templ = &video_src_template;
3465     counter = &dbin->vpadcount;
3466     prefix = "video";
3467   } else if (type & GST_STREAM_TYPE_AUDIO) {
3468     templ = &audio_src_template;
3469     counter = &dbin->apadcount;
3470     prefix = "audio";
3471   } else if (type & GST_STREAM_TYPE_TEXT) {
3472     templ = &text_src_template;
3473     counter = &dbin->tpadcount;
3474     prefix = "text";
3475   } else {
3476     templ = &src_template;
3477     counter = &dbin->opadcount;
3478     prefix = "src";
3479   }
3480
3481   pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
3482   *counter += 1;
3483   ptmpl = gst_static_pad_template_get (templ);
3484   res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
3485   gst_object_unref (ptmpl);
3486   g_free (pad_name);
3487   gst_pad_set_active (res->src_pad, TRUE);
3488   /* Put an event probe on the internal proxy pad to detect upstream
3489    * events */
3490   internal_pad =
3491       (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
3492   gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
3493       (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
3494   gst_object_unref (internal_pad);
3495
3496   dbin->output_streams = g_list_append (dbin->output_streams, res);
3497
3498   return res;
3499 }
3500
3501 static void
3502 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
3503 {
3504   if (output->slot) {
3505     if (output->decoder_sink && output->decoder)
3506       gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
3507
3508     output->slot->output = NULL;
3509     output->slot = NULL;
3510   }
3511   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
3512   gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
3513   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
3514   if (output->src_exposed) {
3515     gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
3516   }
3517   if (output->decoder) {
3518     gst_element_set_locked_state (output->decoder, TRUE);
3519     gst_element_set_state (output->decoder, GST_STATE_NULL);
3520     gst_bin_remove ((GstBin *) dbin, output->decoder);
3521   }
3522   g_free (output);
3523 }
3524
3525 static GstStateChangeReturn
3526 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
3527 {
3528   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3529   GstStateChangeReturn ret;
3530
3531   /* Upwards */
3532   switch (transition) {
3533     default:
3534       break;
3535   }
3536   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3537   if (ret == GST_STATE_CHANGE_FAILURE)
3538     goto beach;
3539
3540   switch (transition) {
3541     case GST_STATE_CHANGE_PAUSED_TO_READY:
3542       gst_decodebin3_reset (dbin);
3543       break;
3544     default:
3545       break;
3546   }
3547 beach:
3548   return ret;
3549 }