urisourcebin: Refactor ChildSrcPadInfo and OutputSlot usage
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst / playback / gsturisourcebin.c
1 /* GStreamer
2  * Copyright (C) <2015> Jan Schmidt <jan@centricular.com>
3  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * SECTION:element-urisourcebin
23  * @title: urisourcebin
24  *
25  * urisourcebin is an element for accessing URIs in a uniform manner.
26  *
27  * It handles selecting a URI source element and potentially download
28  * buffering for network sources. It produces one or more source pads,
29  * depending on the input source, for feeding to decoding chains or decodebin.
30  *
31  * The main configuration is via the #GstURISourceBin:uri property.
32  *
33  * > urisourcebin is still experimental API and a technology preview.
34  * > Its behaviour and exposed API is subject to change.
35  */
36
37 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
38  * with newer GLib versions (>= 2.31.0) */
39 #define GLIB_DISABLE_DEPRECATION_WARNINGS
40
41 #ifdef HAVE_CONFIG_H
42 #  include "config.h"
43 #endif
44
45 #include <string.h>
46
47 #include <gst/gst.h>
48 #include <glib/gi18n-lib.h>
49 #include <gst/pbutils/missing-plugins.h>
50
51 #include "gstplay-enum.h"
52 #include "gstrawcaps.h"
53 #include "gstplaybackelements.h"
54 #include "gstplaybackutils.h"
55
56 #define GST_TYPE_URI_SOURCE_BIN \
57   (gst_uri_source_bin_get_type())
58 #define GST_URI_SOURCE_BIN(obj) \
59   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_URI_SOURCE_BIN,GstURISourceBin))
60 #define GST_URI_SOURCE_BIN_CLASS(klass) \
61   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_URI_SOURCE_BIN,GstURISourceBinClass))
62 #define GST_IS_URI_SOURCE_BIN(obj) \
63   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_URI_SOURCE_BIN))
64 #define GST_IS_URI_SOURCE_BIN_CLASS(klass) \
65   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_URI_SOURCE_BIN))
66 #define GST_URI_SOURCE_BIN_CAST(obj) ((GstURISourceBin *) (obj))
67
68 typedef struct _GstURISourceBin GstURISourceBin;
69 typedef struct _GstURISourceBinClass GstURISourceBinClass;
70 typedef struct _ChildSrcPadInfo ChildSrcPadInfo;
71 typedef struct _OutputSlotInfo OutputSlotInfo;
72
73 #define GST_URI_SOURCE_BIN_LOCK(urisrc) (g_mutex_lock(&((GstURISourceBin*)(urisrc))->lock))
74 #define GST_URI_SOURCE_BIN_UNLOCK(urisrc) (g_mutex_unlock(&((GstURISourceBin*)(urisrc))->lock))
75
76 #define BUFFERING_LOCK(ubin) G_STMT_START {                             \
77     GST_LOG_OBJECT (ubin,                                               \
78                     "buffering locking from thread %p",                 \
79                     g_thread_self ());                                  \
80     g_mutex_lock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock);              \
81     GST_LOG_OBJECT (ubin,                                               \
82                     "buffering lock from thread %p",                    \
83                     g_thread_self ());                                  \
84 } G_STMT_END
85
86 #define BUFFERING_UNLOCK(ubin) G_STMT_START {                           \
87     GST_LOG_OBJECT (ubin,                                               \
88                     "buffering unlocking from thread %p",               \
89                     g_thread_self ());                                  \
90     g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock);            \
91 } G_STMT_END
92
93 /* Track a source pad from the source element and the chain of (optional)
94  * elements that are linked to it up to the output slots */
95 struct _ChildSrcPadInfo
96 {
97   GstURISourceBin *urisrc;
98
99   /* Source pad this info is attached to (reffed) */
100   GstPad *src_pad;
101
102   /* An optional typefind */
103   GstElement *typefind;
104
105   /* list of output slots */
106   GList *outputs;
107 };
108
109 /* Output Slot:
110  *
111  * Handles everything related to outputing, including optional buffering.
112  */
113 struct _OutputSlotInfo
114 {
115   ChildSrcPadInfo *linked_info; /* source pad info feeding this slot */
116
117   GstPad *originating_pad;      /* Pad that created this OutputSlotInfo (ref held) */
118   GstPad *output_pad;           /* Output ghost pad */
119
120   gboolean is_eos;              /* Did EOS get fed into the buffering element */
121
122   GstElement *queue;            /* queue2 or downloadbuffer */
123   GstPad *queue_sinkpad;        /* Sink pad of the queue eleemnt */
124
125   gulong bitrate_changed_id;    /* queue bitrate changed notification */
126
127   guint demuxer_event_probe_id;
128 };
129
130 /**
131  * GstURISourceBin
132  *
133  * urisourcebin element struct
134  */
135 struct _GstURISourceBin
136 {
137   GstBin parent_instance;
138
139   GMutex lock;                  /* lock for constructing */
140
141   gchar *uri;
142   guint64 connection_speed;
143
144   gboolean activated;           /* TRUE if the switch to PAUSED has been completed */
145   gboolean flushing;            /* TRUE if switching from PAUSED to READY */
146   GCond activation_cond;        /* Uses the urisourcebin lock */
147
148   gboolean is_stream;
149   gboolean is_adaptive;
150   gboolean demuxer_handles_buffering;   /* If TRUE: Don't use buffering elements */
151   guint64 buffer_duration;      /* When buffering, buffer duration (ns) */
152   guint buffer_size;            /* When buffering, buffer size (bytes) */
153   gboolean download;
154   gboolean use_buffering;
155   gdouble low_watermark;
156   gdouble high_watermark;
157
158   GstElement *source;
159
160   GList *src_infos;             /* List of ChildSrcPadInfo for the source */
161
162   GstElement *demuxer;          /* Adaptive demuxer if any */
163
164   guint numpads;
165
166   /* for dynamic sources */
167   guint src_np_sig_id;          /* new-pad signal id */
168
169   guint64 ring_buffer_max_size; /* 0 means disabled */
170
171   GList *buffering_status;      /* element currently buffering messages */
172   gint last_buffering_pct;      /* Avoid sending buffering over and over */
173   GMutex buffering_lock;
174   GMutex buffering_post_lock;
175 };
176
177 struct _GstURISourceBinClass
178 {
179   GstBinClass parent_class;
180
181   /* emitted when all data has been drained out
182    * FIXME : What do we need this for ?? */
183   void (*drained) (GstElement * element);
184   /* emitted when all data has been fed into buffering slots (i.e the
185    * actual sources are done) */
186   void (*about_to_finish) (GstElement * element);
187 };
188
189 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
190     GST_PAD_SRC,
191     GST_PAD_SOMETIMES,
192     GST_STATIC_CAPS_ANY);
193
194 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
195
196 GST_DEBUG_CATEGORY_STATIC (gst_uri_source_bin_debug);
197 #define GST_CAT_DEFAULT gst_uri_source_bin_debug
198
199 /* signals */
200 enum
201 {
202   SIGNAL_DRAINED,
203   SIGNAL_ABOUT_TO_FINISH,
204   SIGNAL_SOURCE_SETUP,
205   LAST_SIGNAL
206 };
207
208 /* properties */
209 #define DEFAULT_PROP_URI            NULL
210 #define DEFAULT_PROP_SOURCE         NULL
211 #define DEFAULT_CONNECTION_SPEED    0
212 #define DEFAULT_BUFFER_DURATION     -1
213 #define DEFAULT_BUFFER_SIZE         -1
214 #define DEFAULT_DOWNLOAD            FALSE
215 #define DEFAULT_USE_BUFFERING       TRUE
216 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
217 #define DEFAULT_LOW_WATERMARK       0.01
218 #define DEFAULT_HIGH_WATERMARK      0.99
219
220 #define ACTUAL_DEFAULT_BUFFER_SIZE  10 * 1024 * 1024    /* The value used for byte limits when buffer-size == -1 */
221 #define ACTUAL_DEFAULT_BUFFER_DURATION  5 * GST_SECOND  /* The value used for time limits when buffer-duration == -1 */
222
223 #define GET_BUFFER_SIZE(u) ((u)->buffer_size == -1 ? ACTUAL_DEFAULT_BUFFER_SIZE : (u)->buffer_size)
224 #define GET_BUFFER_DURATION(u) ((u)->buffer_duration == -1 ? ACTUAL_DEFAULT_BUFFER_DURATION : (u)->buffer_duration)
225
226 #define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
227 enum
228 {
229   PROP_0,
230   PROP_URI,
231   PROP_SOURCE,
232   PROP_CONNECTION_SPEED,
233   PROP_BUFFER_SIZE,
234   PROP_BUFFER_DURATION,
235   PROP_DOWNLOAD,
236   PROP_USE_BUFFERING,
237   PROP_RING_BUFFER_MAX_SIZE,
238   PROP_LOW_WATERMARK,
239   PROP_HIGH_WATERMARK,
240   PROP_STATISTICS,
241 };
242
243 #define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
244 #define CUSTOM_EOS_QUARK_DATA "custom-eos"
245 static GQuark
246 _custom_eos_quark_get (void)
247 {
248   static gsize g_quark;
249
250   if (g_once_init_enter (&g_quark)) {
251     gsize quark =
252         (gsize) g_quark_from_static_string ("urisourcebin-custom-eos");
253     g_once_init_leave (&g_quark, quark);
254   }
255   return g_quark;
256 }
257
258 static void post_missing_plugin_error (GstElement * urisrc,
259     const gchar * element_name);
260
261 static guint gst_uri_source_bin_signals[LAST_SIGNAL] = { 0 };
262
263 GType gst_uri_source_bin_get_type (void);
264 #define gst_uri_source_bin_parent_class parent_class
265 G_DEFINE_TYPE (GstURISourceBin, gst_uri_source_bin, GST_TYPE_BIN);
266
267 #define _do_init \
268     GST_DEBUG_CATEGORY_INIT (gst_uri_source_bin_debug, "urisourcebin", 0, "URI source element"); \
269     playback_element_init (plugin);
270 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (urisourcebin, "urisourcebin",
271     GST_RANK_NONE, GST_TYPE_URI_SOURCE_BIN, _do_init);
272
273 static void gst_uri_source_bin_set_property (GObject * object, guint prop_id,
274     const GValue * value, GParamSpec * pspec);
275 static void gst_uri_source_bin_get_property (GObject * object, guint prop_id,
276     GValue * value, GParamSpec * pspec);
277 static void gst_uri_source_bin_finalize (GObject * obj);
278
279 static void handle_message (GstBin * bin, GstMessage * msg);
280
281 static gboolean gst_uri_source_bin_query (GstElement * element,
282     GstQuery * query);
283 static GstStateChangeReturn gst_uri_source_bin_change_state (GstElement *
284     element, GstStateChange transition);
285
286 static void handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad,
287     GstCaps * caps);
288 static gboolean setup_typefind (ChildSrcPadInfo * info);
289 static void remove_demuxer (GstURISourceBin * bin);
290 static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
291 static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info,
292     gboolean do_download, gboolean is_adaptive, gboolean no_buffering,
293     GstPad * originating_pad);
294 static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
295 static void free_output_slot_async (GstURISourceBin * urisrc,
296     OutputSlotInfo * slot);
297 static GstPad *create_output_pad (OutputSlotInfo * slot, GstPad * pad);
298 static void remove_buffering_msgs (GstURISourceBin * bin, GstObject * src);
299
300 static void update_queue_values (GstURISourceBin * urisrc);
301 static GstStructure *get_queue_statistics (GstURISourceBin * urisrc);
302
303 static void
304 gst_uri_source_bin_class_init (GstURISourceBinClass * klass)
305 {
306   GObjectClass *gobject_class;
307   GstElementClass *gstelement_class;
308   GstBinClass *gstbin_class;
309
310   gobject_class = G_OBJECT_CLASS (klass);
311   gstelement_class = GST_ELEMENT_CLASS (klass);
312   gstbin_class = GST_BIN_CLASS (klass);
313
314   gobject_class->set_property = gst_uri_source_bin_set_property;
315   gobject_class->get_property = gst_uri_source_bin_get_property;
316   gobject_class->finalize = gst_uri_source_bin_finalize;
317
318   g_object_class_install_property (gobject_class, PROP_URI,
319       g_param_spec_string ("uri", "URI", "URI to decode",
320           DEFAULT_PROP_URI, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
321
322   g_object_class_install_property (gobject_class, PROP_SOURCE,
323       g_param_spec_object ("source", "Source", "Source object used",
324           GST_TYPE_ELEMENT, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
325
326   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
327       g_param_spec_uint64 ("connection-speed", "Connection Speed",
328           "Network connection speed in kbps (0 = unknown)",
329           0, G_MAXUINT64 / 1000, DEFAULT_CONNECTION_SPEED,
330           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
331
332   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
333       g_param_spec_int ("buffer-size", "Buffer size (bytes)",
334           "Buffer size when buffering streams (-1 default value)",
335           -1, G_MAXINT, DEFAULT_BUFFER_SIZE,
336           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
337   g_object_class_install_property (gobject_class, PROP_BUFFER_DURATION,
338       g_param_spec_int64 ("buffer-duration", "Buffer duration (ns)",
339           "Buffer duration when buffering streams (-1 default value)",
340           -1, G_MAXINT64, DEFAULT_BUFFER_DURATION,
341           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342
343   /**
344    * GstURISourceBin::download:
345    *
346    * For certain media type, enable download buffering.
347    */
348   g_object_class_install_property (gobject_class, PROP_DOWNLOAD,
349       g_param_spec_boolean ("download", "Download",
350           "Attempt download buffering when buffering network streams",
351           DEFAULT_DOWNLOAD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352
353   /**
354    * GstURISourceBin::use-buffering:
355    *
356    * Perform buffering using a queue2 element, and emit BUFFERING
357    * messages based on low-/high-percent thresholds of streaming data,
358    * such as adaptive-demuxer streams.
359    *
360    * When download buffering is activated and used for the current media
361    * type, this property does nothing.
362    *
363    */
364   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
365       g_param_spec_boolean ("use-buffering", "Use Buffering",
366           "Perform buffering on demuxed/parsed media",
367           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
368
369   /**
370    * GstURISourceBin::ring-buffer-max-size
371    *
372    * The maximum size of the ring buffer in kilobytes. If set to 0, the ring
373    * buffer is disabled. Default is 0.
374    *
375    */
376   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
377       g_param_spec_uint64 ("ring-buffer-max-size",
378           "Max. ring buffer size (bytes)",
379           "Max. amount of data in the ring buffer (bytes, 0 = ring buffer disabled)",
380           0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382
383   /**
384    * GstURISourceBin::low-watermark
385    *
386    * Proportion of the queue size (either in bytes or time) for buffering
387    * to restart when crossed from above.  Only used if use-buffering is TRUE.
388    */
389   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
390       g_param_spec_double ("low-watermark", "Low watermark",
391           "Low threshold for buffering to start. Only used if use-buffering is True",
392           0.0, 1.0, DEFAULT_LOW_WATERMARK,
393           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394
395   /**
396    * GstURISourceBin::high-watermark
397    *
398    * Proportion of the queue size (either in bytes or time) to complete
399    * buffering.  Only used if use-buffering is TRUE.
400    */
401   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
402       g_param_spec_double ("high-watermark", "High watermark",
403           "High threshold for buffering to finish. Only used if use-buffering is True",
404           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
405           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
406
407   /**
408    * GstURISourceBin::statistics
409    *
410    * A GStructure containing the following values based on the values from
411    * all the queue's contained in this urisourcebin.
412    *
413    *  "minimum-byte-level"  G_TYPE_UINT               Minimum of the current byte levels
414    *  "maximum-byte-level"  G_TYPE_UINT               Maximum of the current byte levels
415    *  "average-byte-level"  G_TYPE_UINT               Average of the current byte levels
416    *  "minimum-time-level"  G_TYPE_UINT64             Minimum of the current time levels
417    *  "maximum-time-level"  G_TYPE_UINT64             Maximum of the current time levels
418    *  "average-time-level"  G_TYPE_UINT64             Average of the current time levels
419    */
420   g_object_class_install_property (gobject_class, PROP_STATISTICS,
421       g_param_spec_boxed ("statistics", "Queue Statistics",
422           "A set of statistics over all the queue-like elements contained in "
423           "this element", GST_TYPE_STRUCTURE,
424           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
425
426   /**
427    * GstURISourceBin::drained:
428    *
429    * This signal is emitted when the data for the current uri is played.
430    */
431   gst_uri_source_bin_signals[SIGNAL_DRAINED] =
432       g_signal_new ("drained", G_TYPE_FROM_CLASS (klass),
433       G_SIGNAL_RUN_LAST,
434       G_STRUCT_OFFSET (GstURISourceBinClass, drained), NULL, NULL, NULL,
435       G_TYPE_NONE, 0, G_TYPE_NONE);
436
437     /**
438    * GstURISourceBin::about-to-finish:
439    *
440    * This signal is emitted when the data for the current uri is played.
441    */
442   gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH] =
443       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
444       G_SIGNAL_RUN_LAST,
445       G_STRUCT_OFFSET (GstURISourceBinClass, about_to_finish), NULL, NULL, NULL,
446       G_TYPE_NONE, 0, G_TYPE_NONE);
447
448   /**
449    * GstURISourceBin::source-setup:
450    * @bin: the urisourcebin.
451    * @source: source element
452    *
453    * This signal is emitted after the source element has been created, so
454    * it can be configured by setting additional properties (e.g. set a
455    * proxy server for an http source, or set the device and read speed for
456    * an audio cd source). This is functionally equivalent to connecting to
457    * the notify::source signal, but more convenient.
458    *
459    * Since: 1.6.1
460    */
461   gst_uri_source_bin_signals[SIGNAL_SOURCE_SETUP] =
462       g_signal_new ("source-setup", G_TYPE_FROM_CLASS (klass),
463       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
464
465   gst_element_class_add_pad_template (gstelement_class,
466       gst_static_pad_template_get (&srctemplate));
467   gst_element_class_set_static_metadata (gstelement_class,
468       "URI reader", "Generic/Bin/Source",
469       "Download and buffer a URI as needed",
470       "Jan Schmidt <jan@centricular.com>");
471
472   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_uri_source_bin_query);
473   gstelement_class->change_state =
474       GST_DEBUG_FUNCPTR (gst_uri_source_bin_change_state);
475
476   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (handle_message);
477 }
478
479 static void
480 gst_uri_source_bin_init (GstURISourceBin * urisrc)
481 {
482   g_mutex_init (&urisrc->lock);
483
484   g_mutex_init (&urisrc->buffering_lock);
485   g_mutex_init (&urisrc->buffering_post_lock);
486
487   g_cond_init (&urisrc->activation_cond);
488
489   urisrc->uri = g_strdup (DEFAULT_PROP_URI);
490   urisrc->connection_speed = DEFAULT_CONNECTION_SPEED;
491
492   urisrc->buffer_duration = DEFAULT_BUFFER_DURATION;
493   urisrc->buffer_size = DEFAULT_BUFFER_SIZE;
494   urisrc->download = DEFAULT_DOWNLOAD;
495   urisrc->use_buffering = DEFAULT_USE_BUFFERING;
496   urisrc->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
497   urisrc->last_buffering_pct = -1;
498   urisrc->low_watermark = DEFAULT_LOW_WATERMARK;
499   urisrc->high_watermark = DEFAULT_HIGH_WATERMARK;
500
501   urisrc->demuxer_handles_buffering = FALSE;
502
503   GST_OBJECT_FLAG_SET (urisrc,
504       GST_ELEMENT_FLAG_SOURCE | GST_BIN_FLAG_STREAMS_AWARE);
505   gst_bin_set_suppressed_flags (GST_BIN (urisrc),
506       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
507 }
508
509 static void
510 gst_uri_source_bin_finalize (GObject * obj)
511 {
512   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (obj);
513
514   remove_demuxer (urisrc);
515   g_mutex_clear (&urisrc->lock);
516   g_mutex_clear (&urisrc->buffering_lock);
517   g_mutex_clear (&urisrc->buffering_post_lock);
518   g_free (urisrc->uri);
519
520   G_OBJECT_CLASS (parent_class)->finalize (obj);
521 }
522
523 static void
524 gst_uri_source_bin_set_property (GObject * object, guint prop_id,
525     const GValue * value, GParamSpec * pspec)
526 {
527   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (object);
528
529   switch (prop_id) {
530     case PROP_URI:
531       GST_OBJECT_LOCK (urisrc);
532       g_free (urisrc->uri);
533       urisrc->uri = g_value_dup_string (value);
534       GST_OBJECT_UNLOCK (urisrc);
535       break;
536     case PROP_CONNECTION_SPEED:
537       GST_OBJECT_LOCK (urisrc);
538       urisrc->connection_speed = g_value_get_uint64 (value) * 1000;
539       GST_OBJECT_UNLOCK (urisrc);
540       break;
541     case PROP_BUFFER_SIZE:
542       urisrc->buffer_size = g_value_get_int (value);
543       update_queue_values (urisrc);
544       break;
545     case PROP_BUFFER_DURATION:
546       urisrc->buffer_duration = g_value_get_int64 (value);
547       update_queue_values (urisrc);
548       break;
549     case PROP_DOWNLOAD:
550       urisrc->download = g_value_get_boolean (value);
551       break;
552     case PROP_USE_BUFFERING:
553       urisrc->use_buffering = g_value_get_boolean (value);
554       break;
555     case PROP_RING_BUFFER_MAX_SIZE:
556       urisrc->ring_buffer_max_size = g_value_get_uint64 (value);
557       break;
558     case PROP_LOW_WATERMARK:
559       urisrc->low_watermark = g_value_get_double (value);
560       update_queue_values (urisrc);
561       break;
562     case PROP_HIGH_WATERMARK:
563       urisrc->high_watermark = g_value_get_double (value);
564       update_queue_values (urisrc);
565       break;
566     default:
567       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
568       break;
569   }
570 }
571
572 static void
573 gst_uri_source_bin_get_property (GObject * object, guint prop_id,
574     GValue * value, GParamSpec * pspec)
575 {
576   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (object);
577
578   switch (prop_id) {
579     case PROP_URI:
580       GST_OBJECT_LOCK (urisrc);
581       g_value_set_string (value, urisrc->uri);
582       GST_OBJECT_UNLOCK (urisrc);
583       break;
584     case PROP_SOURCE:
585       GST_OBJECT_LOCK (urisrc);
586       g_value_set_object (value, urisrc->source);
587       GST_OBJECT_UNLOCK (urisrc);
588       break;
589     case PROP_CONNECTION_SPEED:
590       GST_OBJECT_LOCK (urisrc);
591       g_value_set_uint64 (value, urisrc->connection_speed / 1000);
592       GST_OBJECT_UNLOCK (urisrc);
593       break;
594     case PROP_BUFFER_SIZE:
595       GST_OBJECT_LOCK (urisrc);
596       g_value_set_int (value, urisrc->buffer_size);
597       GST_OBJECT_UNLOCK (urisrc);
598       break;
599     case PROP_BUFFER_DURATION:
600       GST_OBJECT_LOCK (urisrc);
601       g_value_set_int64 (value, urisrc->buffer_duration);
602       GST_OBJECT_UNLOCK (urisrc);
603       break;
604     case PROP_DOWNLOAD:
605       g_value_set_boolean (value, urisrc->download);
606       break;
607     case PROP_USE_BUFFERING:
608       g_value_set_boolean (value, urisrc->use_buffering);
609       break;
610     case PROP_RING_BUFFER_MAX_SIZE:
611       g_value_set_uint64 (value, urisrc->ring_buffer_max_size);
612       break;
613     case PROP_LOW_WATERMARK:
614       g_value_set_double (value, urisrc->low_watermark);
615       break;
616     case PROP_HIGH_WATERMARK:
617       g_value_set_double (value, urisrc->high_watermark);
618       break;
619     case PROP_STATISTICS:
620       g_value_take_boxed (value, get_queue_statistics (urisrc));
621       break;
622     default:
623       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
624       break;
625   }
626 }
627
628 static gboolean
629 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
630 {
631   GstPad *gpad = GST_PAD_CAST (user_data);
632
633   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
634   gst_pad_store_sticky_event (gpad, *event);
635
636   return TRUE;
637 }
638
639 static GstPadProbeReturn
640 demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot);
641
642 /* CALL WITH URISOURCEBIN LOCK */
643 static void
644 free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc)
645 {
646   g_assert (info->src_pad);
647
648   GST_DEBUG_OBJECT (urisrc,
649       "Freeing ChildSrcPadInfo for %" GST_PTR_FORMAT, info->src_pad);
650   if (info->typefind) {
651     gst_element_set_state (info->typefind, GST_STATE_NULL);
652     gst_bin_remove (GST_BIN_CAST (urisrc), info->typefind);
653   }
654
655   gst_object_unref (info->src_pad);
656
657   g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc);
658   g_list_free (info->outputs);
659
660   g_free (info);
661 }
662
663 static ChildSrcPadInfo *
664 get_cspi_for_pad (GstURISourceBin * urisrc, GstPad * pad)
665 {
666   GList *iter;
667
668   for (iter = urisrc->src_infos; iter; iter = iter->next) {
669     ChildSrcPadInfo *info = iter->data;
670     if (info->src_pad == pad)
671       return info;
672   }
673   return NULL;
674 }
675
676 static ChildSrcPadInfo *
677 new_child_src_pad_info (GstURISourceBin * urisrc, GstPad * pad)
678 {
679   ChildSrcPadInfo *info;
680
681   GST_LOG_OBJECT (urisrc, "New ChildSrcPadInfo for %" GST_PTR_FORMAT, pad);
682
683   info = g_new0 (ChildSrcPadInfo, 1);
684   info->urisrc = urisrc;
685   info->src_pad = gst_object_ref (pad);
686
687   urisrc->src_infos = g_list_append (urisrc->src_infos, info);
688
689   return info;
690 }
691
692 /* Called by the signal handlers when a demuxer has produced a new stream */
693 static void
694 new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
695     ChildSrcPadInfo * info)
696 {
697   GstURISourceBin *urisrc = info->urisrc;
698   OutputSlotInfo *slot;
699   GstPad *output_pad;
700
701   GST_URI_SOURCE_BIN_LOCK (urisrc);
702   /* If the demuxer handles buffering and is streams-aware, we can expose it
703      as-is directly. We still add an event probe to deal with EOS */
704   slot =
705       new_output_slot (info, FALSE, FALSE, urisrc->demuxer_handles_buffering,
706       pad);
707   output_pad = gst_object_ref (slot->output_pad);
708
709   GST_DEBUG_OBJECT (element,
710       "New streams-aware demuxer pad %s:%s , exposing directly",
711       GST_DEBUG_PAD_NAME (pad));
712   slot->demuxer_event_probe_id =
713       gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
714       GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) demux_pad_events,
715       slot, NULL);
716
717   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
718   expose_output_pad (urisrc, output_pad);
719   gst_object_unref (output_pad);
720 }
721
722 /* Called with lock held */
723 static gboolean
724 all_slots_are_eos (GstURISourceBin * urisrc)
725 {
726   GList *tmp;
727
728   for (tmp = urisrc->src_infos; tmp; tmp = tmp->next) {
729     ChildSrcPadInfo *cspi = tmp->data;
730     GList *iter2;
731     for (iter2 = cspi->outputs; iter2; iter2 = iter2->next) {
732       OutputSlotInfo *slot = (OutputSlotInfo *) iter2->data;
733       if (slot->is_eos == FALSE)
734         return FALSE;
735     }
736   }
737   return TRUE;
738 }
739
740 /* CALL WITH URISOURCEBIN LOCK */
741 static OutputSlotInfo *
742 output_slot_for_originating_pad (ChildSrcPadInfo * info,
743     GstPad * originating_pad)
744 {
745   GList *iter;
746   for (iter = info->outputs; iter; iter = iter->next) {
747     OutputSlotInfo *slot = iter->data;
748     if (slot->originating_pad == originating_pad)
749       return slot;
750   }
751
752   return NULL;
753 }
754
755 static GstPadProbeReturn
756 demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot)
757 {
758   GstURISourceBin *urisrc = slot->linked_info->urisrc;
759   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
760   GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
761
762   GST_URI_SOURCE_BIN_LOCK (urisrc);
763
764   switch (GST_EVENT_TYPE (ev)) {
765     case GST_EVENT_EOS:
766     {
767       gboolean all_streams_eos;
768
769       GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
770
771       BUFFERING_LOCK (urisrc);
772       /* Mark that we fed an EOS to this slot */
773       slot->is_eos = TRUE;
774       all_streams_eos = all_slots_are_eos (urisrc);
775       BUFFERING_UNLOCK (urisrc);
776
777       if (slot->queue)
778         /* EOS means this element is no longer buffering */
779         remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
780
781       if (all_streams_eos) {
782         GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH");
783         g_signal_emit (urisrc,
784             gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH], 0, NULL);
785       }
786     }
787       break;
788     case GST_EVENT_STREAM_START:
789     case GST_EVENT_FLUSH_STOP:
790       BUFFERING_LOCK (urisrc);
791       slot->is_eos = FALSE;
792       BUFFERING_UNLOCK (urisrc);
793       break;
794     default:
795       break;
796   }
797
798   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
799
800   return ret;
801 }
802
803 static GstPadProbeReturn
804 pre_queue_event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
805 {
806   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
807   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
808   GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
809
810   switch (GST_EVENT_TYPE (ev)) {
811     case GST_EVENT_EOS:
812     {
813       GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
814       GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH");
815       g_signal_emit (urisrc,
816           gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH], 0, NULL);
817     }
818       break;
819     default:
820       break;
821   }
822   return ret;
823 }
824
825 static GstStructure *
826 get_queue_statistics (GstURISourceBin * urisrc)
827 {
828   GstStructure *ret = NULL;
829   guint min_byte_level = 0, max_byte_level = 0;
830   guint64 min_time_level = 0, max_time_level = 0;
831   gdouble avg_byte_level = 0., avg_time_level = 0.;
832   guint i = 0;
833   GList *iter, *cur;
834
835   GST_URI_SOURCE_BIN_LOCK (urisrc);
836
837   for (iter = urisrc->src_infos; iter; iter = iter->next) {
838     ChildSrcPadInfo *info = iter->data;
839     for (cur = info->outputs; cur; cur = cur->next) {
840       OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
841       guint byte_limit = 0;
842       guint64 time_limit = 0;
843
844       if (!slot->queue)
845         continue;
846
847       g_object_get (slot->queue, "current-level-bytes", &byte_limit,
848           "current-level-time", &time_limit, NULL);
849
850       if (byte_limit < min_byte_level)
851         min_byte_level = byte_limit;
852       if (byte_limit > max_byte_level)
853         max_byte_level = byte_limit;
854       avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1);
855
856       if (time_limit < min_time_level)
857         min_time_level = time_limit;
858       if (time_limit > max_time_level)
859         max_time_level = time_limit;
860       avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1);
861
862       i++;
863     }
864   }
865   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
866
867   ret = gst_structure_new ("application/x-urisourcebin-stats",
868       "minimum-byte-level", G_TYPE_UINT, (guint) min_byte_level,
869       "maximum-byte-level", G_TYPE_UINT, (guint) max_byte_level,
870       "average-byte-level", G_TYPE_UINT, (guint) avg_byte_level,
871       "minimum-time-level", G_TYPE_UINT64, (guint64) min_time_level,
872       "maximum-time-level", G_TYPE_UINT64, (guint64) max_time_level,
873       "average-time-level", G_TYPE_UINT64, (guint64) avg_time_level, NULL);
874
875   return ret;
876 }
877
878 static void
879 update_queue_values (GstURISourceBin * urisrc)
880 {
881   gint64 duration;
882   guint buffer_size;
883   gdouble low_watermark, high_watermark;
884   guint64 cumulative_bitrate = 0;
885   GList *iter, *cur;
886
887   GST_URI_SOURCE_BIN_LOCK (urisrc);
888   duration = GET_BUFFER_DURATION (urisrc);
889   buffer_size = GET_BUFFER_SIZE (urisrc);
890   low_watermark = urisrc->low_watermark;
891   high_watermark = urisrc->high_watermark;
892
893   for (iter = urisrc->src_infos; iter; iter = iter->next) {
894     ChildSrcPadInfo *info = iter->data;
895     for (cur = info->outputs; cur; cur = cur->next) {
896       OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
897       guint64 bitrate = 0;
898
899       if (!slot->queue)
900         continue;
901
902       if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
903               "bitrate")) {
904         g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
905       }
906
907       if (bitrate > 0)
908         cumulative_bitrate += bitrate;
909       else {
910         GST_TRACE_OBJECT (urisrc,
911             "Unknown bitrate detected from %" GST_PTR_FORMAT
912             ", resetting all bitrates", slot->queue);
913         cumulative_bitrate = 0;
914         break;
915       }
916     }
917   }
918
919   GST_DEBUG_OBJECT (urisrc, "recalculating queue limits with cumulative "
920       "bitrate %" G_GUINT64_FORMAT ", buffer size %u, buffer duration %"
921       G_GINT64_FORMAT, cumulative_bitrate, buffer_size, duration);
922
923   for (iter = urisrc->src_infos; iter; iter = iter->next) {
924     ChildSrcPadInfo *info = iter->data;
925     for (cur = info->outputs; cur; cur = cur->next) {
926       OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
927       guint byte_limit;
928
929       if (!slot->queue)
930         continue;
931
932       if (cumulative_bitrate > 0
933           && g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
934               "bitrate")) {
935         guint64 bitrate;
936         g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
937         byte_limit =
938             gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate);
939       } else {
940         /* if not all queue's have valid bitrates, use the buffer-size as the
941          * limit */
942         byte_limit = buffer_size;
943       }
944
945       GST_DEBUG_OBJECT (urisrc,
946           "calculated new limits for queue-like element %" GST_PTR_FORMAT
947           ", bytes:%u, time:%" G_GUINT64_FORMAT
948           ", low-watermark:%f, high-watermark:%f",
949           slot->queue, byte_limit, (guint64) duration, low_watermark,
950           high_watermark);
951       g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit,
952           "max-size-time", (guint64) duration, "low-watermark", low_watermark,
953           "high-watermark", high_watermark, NULL);
954     }
955   }
956   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
957 }
958
959 static void
960 on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec,
961     gpointer user_data)
962 {
963   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
964
965   gst_element_call_async (GST_ELEMENT (urisrc),
966       (GstElementCallAsyncFunc) update_queue_values, NULL, NULL);
967 }
968
969 /* Called with lock held */
970 static OutputSlotInfo *
971 new_output_slot (ChildSrcPadInfo * info, gboolean do_download,
972     gboolean is_adaptive, gboolean no_buffering, GstPad * originating_pad)
973 {
974   GstURISourceBin *urisrc = info->urisrc;
975   OutputSlotInfo *slot;
976   GstPad *srcpad;
977   GstElement *queue = NULL;
978   const gchar *elem_name;
979
980   GST_DEBUG_OBJECT (urisrc,
981       "do_download:%d is_adaptive:%d, no_buffering:%d, originating_pad:%"
982       GST_PTR_FORMAT, do_download, is_adaptive, no_buffering, originating_pad);
983
984   slot = g_new0 (OutputSlotInfo, 1);
985   slot->linked_info = info;
986
987   /* If buffering is required, create the element */
988   if (!no_buffering) {
989     if (do_download)
990       elem_name = "downloadbuffer";
991     else
992       elem_name = "queue2";
993
994     queue = gst_element_factory_make (elem_name, NULL);
995     if (!queue)
996       goto no_buffer_element;
997
998     slot->queue = queue;
999
1000     slot->bitrate_changed_id =
1001         g_signal_connect (G_OBJECT (queue), "notify::bitrate",
1002         (GCallback) on_queue_bitrate_changed, urisrc);
1003
1004     if (do_download) {
1005       gchar *temp_template, *filename;
1006       const gchar *tmp_dir, *prgname;
1007
1008       tmp_dir = g_get_user_cache_dir ();
1009       prgname = g_get_prgname ();
1010       if (prgname == NULL)
1011         prgname = "GStreamer";
1012
1013       filename = g_strdup_printf ("%s-XXXXXX", prgname);
1014
1015       /* build our filename */
1016       temp_template = g_build_filename (tmp_dir, filename, NULL);
1017
1018       GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
1019           temp_template, tmp_dir, prgname, filename);
1020
1021       /* configure progressive download for selected media types */
1022       g_object_set (queue, "temp-template", temp_template, NULL);
1023
1024       g_free (filename);
1025       g_free (temp_template);
1026     } else {
1027       if (is_adaptive) {
1028         GST_LOG_OBJECT (urisrc, "Adding queue2 for adaptive streaming stream");
1029         g_object_set (queue, "use-buffering", urisrc->use_buffering,
1030             "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL);
1031       } else {
1032         GST_LOG_OBJECT (urisrc, "Adding queue for buffering");
1033         g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
1034       }
1035
1036       g_object_set (queue, "ring-buffer-max-size",
1037           urisrc->ring_buffer_max_size, NULL);
1038       /* Disable max-size-buffers - queue based on data rate to the default time limit */
1039       g_object_set (queue, "max-size-buffers", 0, NULL);
1040
1041       /* Don't start buffering until the queue is empty (< 1%).
1042        * Start playback when the queue is 60% full, leaving a bit more room
1043        * for upstream to push more without getting bursty */
1044       g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL);
1045
1046       g_object_set (queue, "low-watermark", urisrc->low_watermark,
1047           "high-watermark", urisrc->high_watermark, NULL);
1048     }
1049
1050     /* set the necessary limits on the queue-like elements */
1051     g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc),
1052         "max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL);
1053
1054     gst_bin_add (GST_BIN_CAST (urisrc), queue);
1055     gst_element_sync_state_with_parent (queue);
1056
1057     slot->queue_sinkpad = gst_element_get_static_pad (queue, "sink");
1058
1059     /* get the new raw srcpad */
1060     srcpad = gst_element_get_static_pad (queue, "src");
1061
1062     slot->output_pad = create_output_pad (slot, srcpad);
1063
1064     gst_object_unref (srcpad);
1065
1066     gst_pad_link (originating_pad, slot->queue_sinkpad);
1067   } else {
1068     /* Expose pad directly */
1069     slot->output_pad = create_output_pad (slot, originating_pad);
1070   }
1071   slot->originating_pad = gst_object_ref (originating_pad);
1072
1073   /* save output slot so we can remove it later */
1074   info->outputs = g_list_append (info->outputs, slot);
1075
1076   GST_DEBUG_OBJECT (urisrc, "New slot for output_pad %" GST_PTR_FORMAT,
1077       slot->output_pad);
1078
1079   return slot;
1080
1081 no_buffer_element:
1082   {
1083     g_free (slot);
1084     post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), elem_name);
1085     return NULL;
1086   }
1087 }
1088
1089 static GstPadProbeReturn
1090 source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
1091     gpointer user_data)
1092 {
1093   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
1094   OutputSlotInfo *slot = user_data;
1095   GstURISourceBin *urisrc = slot->linked_info->urisrc;
1096
1097   GST_LOG_OBJECT (pad, "%" GST_PTR_FORMAT, event);
1098
1099   /* A custom EOS will be received if an adaptive demuxer source pad removed a
1100    * pad and buffering was present on that slot */
1101   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
1102       gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (event),
1103           CUSTOM_EOS_QUARK)) {
1104     GstPadProbeReturn probe_ret = GST_PAD_PROBE_DROP;
1105
1106     GST_DEBUG_OBJECT (pad, "we received custom EOS");
1107
1108     /* remove custom-eos */
1109     gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event), CUSTOM_EOS_QUARK,
1110         NULL, NULL);
1111
1112     GST_URI_SOURCE_BIN_LOCK (urisrc);
1113
1114     if (slot->is_eos) {
1115       /* linked_info is old input which is still linked without removal */
1116       GST_DEBUG_OBJECT (pad, "push actual EOS");
1117       gst_pad_push_event (slot->output_pad, event);
1118       probe_ret = GST_PAD_PROBE_HANDLED;
1119     }
1120
1121     /* And finally remove the output. This is done asynchronously since we can't
1122      * do it from the streaming thread */
1123     free_output_slot_async (urisrc, slot);
1124
1125     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1126     return probe_ret;
1127   }
1128   /* never drop events */
1129   return GST_PAD_PROBE_OK;
1130 }
1131
1132 /* called when we found a raw pad to expose. We set up a
1133  * padprobe to detect EOS before exposing the pad.
1134  * Called with LOCK held. */
1135 static GstPad *
1136 create_output_pad (OutputSlotInfo * slot, GstPad * pad)
1137 {
1138   GstURISourceBin *urisrc = slot->linked_info->urisrc;
1139   GstPad *newpad;
1140   GstPadTemplate *pad_tmpl;
1141   gchar *padname;
1142
1143   /* If the output slot does buffering, add a probe to detect drainage */
1144   if (slot->queue)
1145     gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
1146         source_pad_event_probe, slot, NULL);
1147
1148   pad_tmpl = gst_static_pad_template_get (&srctemplate);
1149
1150   padname = g_strdup_printf ("src_%u", urisrc->numpads);
1151   urisrc->numpads++;
1152
1153   newpad = gst_ghost_pad_new_from_template (padname, pad, pad_tmpl);
1154   gst_object_unref (pad_tmpl);
1155   g_free (padname);
1156
1157   GST_DEBUG_OBJECT (urisrc, "Created output pad %s:%s for pad %s:%s",
1158       GST_DEBUG_PAD_NAME (newpad), GST_DEBUG_PAD_NAME (pad));
1159
1160   return newpad;
1161 }
1162
1163 static GstPadProbeReturn
1164 expose_block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1165 {
1166   GstURISourceBin *urisrc = (GstURISourceBin *) user_data;
1167   gboolean expose = FALSE;
1168
1169   GST_DEBUG_OBJECT (pad, "blocking");
1170
1171   GST_URI_SOURCE_BIN_LOCK (urisrc);
1172   while (!urisrc->activated && !urisrc->flushing) {
1173     GST_DEBUG_OBJECT (urisrc, "activated:%d flushing:%d", urisrc->activated,
1174         urisrc->flushing);
1175     g_cond_wait (&urisrc->activation_cond, &urisrc->lock);
1176   }
1177   GST_DEBUG_OBJECT (urisrc, "activated:%d flushing:%d", urisrc->activated,
1178       urisrc->flushing);
1179
1180   if (!urisrc->flushing)
1181     expose = TRUE;
1182   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1183   if (expose)
1184     gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
1185   GST_DEBUG_OBJECT (pad, "Done blocking, removing probe");
1186   return GST_PAD_PROBE_REMOVE;
1187 }
1188
1189 static void
1190 expose_output_pad (GstURISourceBin * urisrc, GstPad * pad)
1191 {
1192   GstPad *target;
1193
1194   if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (urisrc)))
1195     return;                     /* Pad is already exposed */
1196
1197   target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1198
1199   gst_pad_sticky_events_foreach (target, copy_sticky_events, pad);
1200   gst_object_unref (target);
1201
1202   gst_pad_set_active (pad, TRUE);
1203   GST_URI_SOURCE_BIN_LOCK (urisrc);
1204   if (!urisrc->activated) {
1205     GST_DEBUG_OBJECT (urisrc, "Not fully activated, adding pad once PAUSED !");
1206     gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
1207         expose_block_probe, urisrc, NULL);
1208     pad = NULL;
1209   }
1210   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1211
1212   if (pad) {
1213     GST_DEBUG_OBJECT (urisrc, "Exposing pad %" GST_PTR_FORMAT, pad);
1214     gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
1215   }
1216 }
1217
1218 static void
1219 demuxer_pad_removed_cb (GstElement * element, GstPad * pad,
1220     ChildSrcPadInfo * info)
1221 {
1222   GstURISourceBin *urisrc;
1223   OutputSlotInfo *slot;
1224
1225   /* we only care about srcpads */
1226   if (!GST_PAD_IS_SRC (pad))
1227     return;
1228
1229   urisrc = info->urisrc;
1230
1231   GST_DEBUG_OBJECT (urisrc, "pad removed name: <%s:%s>",
1232       GST_DEBUG_PAD_NAME (pad));
1233
1234   GST_URI_SOURCE_BIN_LOCK (urisrc);
1235   slot = output_slot_for_originating_pad (info, pad);
1236   g_assert (slot);
1237
1238   gst_pad_remove_probe (pad, slot->demuxer_event_probe_id);
1239   slot->demuxer_event_probe_id = 0;
1240
1241   if (slot->queue) {
1242     gboolean was_eos;
1243
1244     /* Propagate custom EOS to buffering elements. The slot will be removed when
1245      * it is received on the output of the buffering elements */
1246
1247     BUFFERING_LOCK (urisrc);
1248     /* Unlink this pad from its output slot and send a fake EOS event
1249      * to drain the queue */
1250     was_eos = slot->is_eos;
1251     slot->is_eos = TRUE;
1252     BUFFERING_UNLOCK (urisrc);
1253
1254     remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
1255     if (!was_eos) {
1256       GstStructure *s;
1257       GstEvent *event;
1258       event = gst_event_new_eos ();
1259       s = gst_event_writable_structure (event);
1260       gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE,
1261           NULL);
1262       gst_pad_send_event (slot->queue_sinkpad, event);
1263     }
1264   } else {
1265     GST_LOG_OBJECT (urisrc,
1266         "No buffering involved, removing output slot immediately");
1267     /* Remove output slot immediately */
1268     info->outputs = g_list_remove (info->outputs, slot);
1269     free_output_slot (slot, urisrc);
1270   }
1271   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1272
1273   return;
1274 }
1275
1276 /* helper function to lookup stuff in lists */
1277 static gboolean
1278 array_has_value (const gchar * values[], const gchar * value)
1279 {
1280   gint i;
1281
1282   for (i = 0; values[i]; i++) {
1283     if (g_str_has_prefix (value, values[i]))
1284       return TRUE;
1285   }
1286   return FALSE;
1287 }
1288
1289 static gboolean
1290 array_has_uri_value (const gchar * values[], const gchar * value)
1291 {
1292   gint i;
1293
1294   for (i = 0; values[i]; i++) {
1295     if (!g_ascii_strncasecmp (value, values[i], strlen (values[i])))
1296       return TRUE;
1297   }
1298   return FALSE;
1299 }
1300
1301 /* list of URIs that we consider to be streams and that need buffering.
1302  * We have no mechanism yet to figure this out with a query. */
1303 static const gchar *stream_uris[] = { "http://", "https://", "mms://",
1304   "mmsh://", "mmsu://", "mmst://", "fd://", "myth://", "ssh://",
1305   "ftp://", "sftp://",
1306   NULL
1307 };
1308
1309 /* list of URIs that need a queue because they are pretty bursty */
1310 static const gchar *queue_uris[] = { "cdda://", NULL };
1311
1312 /* blacklisted URIs, we know they will always fail. */
1313 static const gchar *blacklisted_uris[] = { NULL };
1314
1315 /* media types that use adaptive streaming */
1316 static const gchar *adaptive_media[] = {
1317   "application/x-hls", "application/vnd.ms-sstr+xml",
1318   "application/dash+xml", NULL
1319 };
1320
1321 #define IS_STREAM_URI(uri)          (array_has_uri_value (stream_uris, uri))
1322 #define IS_QUEUE_URI(uri)           (array_has_uri_value (queue_uris, uri))
1323 #define IS_BLACKLISTED_URI(uri)     (array_has_uri_value (blacklisted_uris, uri))
1324 #define IS_ADAPTIVE_MEDIA(media)    (array_has_value (adaptive_media, media))
1325
1326 /*
1327  * Generate and configure a source element.
1328  */
1329 static GstElement *
1330 gen_source_element (GstURISourceBin * urisrc)
1331 {
1332   GObjectClass *source_class;
1333   GstElement *source;
1334   GParamSpec *pspec;
1335   GstQuery *query;
1336   GstSchedulingFlags flags;
1337   GError *err = NULL;
1338
1339   if (!urisrc->uri)
1340     goto no_uri;
1341
1342   GST_LOG_OBJECT (urisrc, "finding source for %s", urisrc->uri);
1343
1344   if (!gst_uri_is_valid (urisrc->uri))
1345     goto invalid_uri;
1346
1347   if (IS_BLACKLISTED_URI (urisrc->uri))
1348     goto uri_blacklisted;
1349
1350   source = gst_element_make_from_uri (GST_URI_SRC, urisrc->uri, NULL, &err);
1351   if (!source)
1352     goto no_source;
1353
1354   GST_LOG_OBJECT (urisrc, "found source type %s", G_OBJECT_TYPE_NAME (source));
1355
1356   urisrc->is_stream = IS_STREAM_URI (urisrc->uri);
1357
1358   query = gst_query_new_scheduling ();
1359   if (gst_element_query (source, query)) {
1360     gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
1361     if ((flags & GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED))
1362       urisrc->is_stream = TRUE;
1363   }
1364   gst_query_unref (query);
1365
1366   source_class = G_OBJECT_GET_CLASS (source);
1367
1368   if (urisrc->is_stream) {
1369     /* Live sources are not streamable */
1370     pspec = g_object_class_find_property (source_class, "is-live");
1371     if (pspec && G_PARAM_SPEC_VALUE_TYPE (pspec) == G_TYPE_BOOLEAN) {
1372       gboolean is_live;
1373       g_object_get (G_OBJECT (source), "is-live", &is_live, NULL);
1374       if (is_live)
1375         urisrc->is_stream = FALSE;
1376     }
1377   }
1378
1379   GST_LOG_OBJECT (urisrc, "source is stream: %d", urisrc->is_stream);
1380
1381   pspec = g_object_class_find_property (source_class, "connection-speed");
1382   if (pspec != NULL) {
1383     guint64 speed = urisrc->connection_speed / 1000;
1384     gboolean wrong_type = FALSE;
1385
1386     if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_UINT) {
1387       GParamSpecUInt *pspecuint = G_PARAM_SPEC_UINT (pspec);
1388
1389       speed = CLAMP (speed, pspecuint->minimum, pspecuint->maximum);
1390     } else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_INT) {
1391       GParamSpecInt *pspecint = G_PARAM_SPEC_INT (pspec);
1392
1393       speed = CLAMP (speed, pspecint->minimum, pspecint->maximum);
1394     } else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_UINT64) {
1395       GParamSpecUInt64 *pspecuint = G_PARAM_SPEC_UINT64 (pspec);
1396
1397       speed = CLAMP (speed, pspecuint->minimum, pspecuint->maximum);
1398     } else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_INT64) {
1399       GParamSpecInt64 *pspecint = G_PARAM_SPEC_INT64 (pspec);
1400
1401       speed = CLAMP (speed, pspecint->minimum, pspecint->maximum);
1402     } else {
1403       GST_WARNING_OBJECT (urisrc,
1404           "The connection speed property %" G_GUINT64_FORMAT
1405           " of type %s is not useful. Not setting it", speed,
1406           g_type_name (G_PARAM_SPEC_TYPE (pspec)));
1407       wrong_type = TRUE;
1408     }
1409
1410     if (!wrong_type) {
1411       g_object_set (source, "connection-speed", speed, NULL);
1412
1413       GST_DEBUG_OBJECT (urisrc,
1414           "setting connection-speed=%" G_GUINT64_FORMAT " to source element",
1415           speed);
1416     }
1417   }
1418
1419   return source;
1420
1421   /* ERRORS */
1422 no_uri:
1423   {
1424     GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
1425         (_("No URI specified to play from.")), (NULL));
1426     return NULL;
1427   }
1428 invalid_uri:
1429   {
1430     GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
1431         (_("Invalid URI \"%s\"."), urisrc->uri), (NULL));
1432     g_clear_error (&err);
1433     return NULL;
1434   }
1435 uri_blacklisted:
1436   {
1437     GST_ELEMENT_ERROR (urisrc, RESOURCE, FAILED,
1438         (_("This stream type cannot be played yet.")), (NULL));
1439     return NULL;
1440   }
1441 no_source:
1442   {
1443     /* whoops, could not create the source element, dig a little deeper to
1444      * figure out what might be wrong. */
1445     if (err != NULL && err->code == GST_URI_ERROR_UNSUPPORTED_PROTOCOL) {
1446       gchar *prot;
1447
1448       prot = gst_uri_get_protocol (urisrc->uri);
1449       if (prot == NULL)
1450         goto invalid_uri;
1451
1452       gst_element_post_message (GST_ELEMENT_CAST (urisrc),
1453           gst_missing_uri_source_message_new (GST_ELEMENT (urisrc), prot));
1454
1455       GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN,
1456           (_("No URI handler implemented for \"%s\"."), prot), (NULL));
1457
1458       g_free (prot);
1459     } else {
1460       GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
1461           ("%s", (err) ? err->message : "URI was not accepted by any element"),
1462           ("No element accepted URI '%s'", urisrc->uri));
1463     }
1464
1465     g_clear_error (&err);
1466     return NULL;
1467   }
1468 }
1469
1470 static gboolean
1471 is_all_raw_caps (GstCaps * caps, GstCaps * rawcaps, gboolean * all_raw)
1472 {
1473   GstCaps *intersection;
1474   gint capssize;
1475   gboolean res = FALSE;
1476
1477   if (caps == NULL)
1478     return FALSE;
1479
1480   capssize = gst_caps_get_size (caps);
1481   /* no caps, skip and move to the next pad */
1482   if (capssize == 0 || gst_caps_is_empty (caps) || gst_caps_is_any (caps))
1483     goto done;
1484
1485   intersection = gst_caps_intersect (caps, rawcaps);
1486   *all_raw = !gst_caps_is_empty (intersection)
1487       && (gst_caps_get_size (intersection) == capssize);
1488   gst_caps_unref (intersection);
1489
1490   res = TRUE;
1491
1492 done:
1493   return res;
1494 }
1495
1496 static void
1497 post_missing_plugin_error (GstElement * urisrc, const gchar * element_name)
1498 {
1499   GstMessage *msg;
1500
1501   msg = gst_missing_element_message_new (urisrc, element_name);
1502   gst_element_post_message (urisrc, msg);
1503
1504   GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN,
1505       (_("Missing element '%s' - check your GStreamer installation."),
1506           element_name), (NULL));
1507 }
1508
1509 typedef struct
1510 {
1511   GstURISourceBin *urisrc;
1512   gboolean have_out;
1513   gboolean res;
1514 } AnalyseData;
1515
1516 static void
1517 analyse_pad_foreach (const GValue * item, AnalyseData * data)
1518 {
1519   GstURISourceBin *urisrc = data->urisrc;
1520   GstPad *pad = g_value_dup_object (item);
1521   ChildSrcPadInfo *info;
1522   GstCaps *padcaps = NULL;
1523   gboolean pad_is_raw;
1524   gboolean res = TRUE;
1525
1526   GST_LOG_OBJECT (urisrc, "pad %" GST_PTR_FORMAT, pad);
1527
1528   data->have_out = TRUE;
1529
1530   /* The info might already exist if there was an iterator resync */
1531   if (get_cspi_for_pad (urisrc, pad)) {
1532     GST_LOG_OBJECT (urisrc, "Already analysed");
1533     goto out;
1534   }
1535
1536   info = new_child_src_pad_info (urisrc, pad);
1537   padcaps = gst_pad_query_caps (pad, NULL);
1538
1539   if (!is_all_raw_caps (padcaps, DEFAULT_CAPS, &pad_is_raw) || !pad_is_raw) {
1540     /* if FALSE, this pad has no caps, we setup typefinding on it */
1541     if (!setup_typefind (info)) {
1542       res = FALSE;
1543       goto out;
1544     }
1545   } else if (pad_is_raw) {
1546     /* caps on source pad are all raw, we can add the pad */
1547     GstPad *output_pad;
1548     OutputSlotInfo *slot;
1549
1550     GST_URI_SOURCE_BIN_LOCK (urisrc);
1551     /* Only use buffering on raw pads in very specific conditions */
1552     GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d",
1553         urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri));
1554     slot = new_output_slot (info, FALSE, FALSE, !urisrc->use_buffering
1555         || !IS_QUEUE_URI (urisrc->uri), pad);
1556
1557     if (!slot) {
1558       res = FALSE;
1559       GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1560       goto out;
1561     }
1562
1563     /* get the new raw srcpad */
1564     output_pad = gst_object_ref (slot->output_pad);
1565
1566     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1567
1568     expose_output_pad (urisrc, output_pad);
1569     gst_object_unref (output_pad);
1570   } else {
1571     GST_DEBUG_OBJECT (urisrc, "Handling non-raw pad");
1572     /* The caps are non-raw, we handle it directly */
1573     handle_new_pad (info, pad, padcaps);
1574   }
1575
1576 out:
1577   if (padcaps)
1578     gst_caps_unref (padcaps);
1579   gst_object_unref (pad);
1580   data->res &= res;
1581 }
1582
1583 /**
1584  * analyse_source_and_expose_raw_pads:
1585  * @urisrc: a #GstURISourceBin
1586  * @all_pads_raw: are all pads raw data
1587  * @have_out: does the source have output
1588  * @is_dynamic: is this a dynamic source
1589  *
1590  * Check the source of @urisrc and collect information about it.
1591  *
1592  * All pads will be handled directly. Raw pads are exposed as-is. Pads without
1593  * any caps will have a typefind appended to them, and other pads will be
1594  * analysed further.
1595  *
1596  * @is_raw will be set to TRUE if the source only produces raw pads. When this
1597  * function returns, all of the raw pad of the source will be added
1598  * to @urisrc
1599  *
1600  * @have_out: will be set to TRUE if the source has output pads.
1601  *
1602  * @is_dynamic: TRUE if the element will create (more) pads dynamically later
1603  * on.
1604  *
1605  * Returns: FALSE if a fatal error occurred while scanning.
1606  */
1607 static gboolean
1608 analyse_source_and_expose_raw_pads (GstURISourceBin * urisrc,
1609     gboolean * have_out, gboolean * is_dynamic)
1610 {
1611   GstElementClass *elemclass;
1612   AnalyseData data = { 0, };
1613   GstIteratorResult iterres;
1614   GList *walk;
1615   GstIterator *pads_iter;
1616   gboolean res = TRUE;
1617
1618   pads_iter = gst_element_iterate_src_pads (urisrc->source);
1619
1620 restart:
1621   data.res = TRUE;
1622   data.have_out = FALSE;
1623   data.urisrc = urisrc;
1624   iterres =
1625       gst_iterator_foreach (pads_iter,
1626       (GstIteratorForeachFunction) analyse_pad_foreach, &data);
1627   if (iterres == GST_ITERATOR_RESYNC)
1628     goto restart;
1629   if (iterres == GST_ITERATOR_ERROR)
1630     res = FALSE;
1631   else
1632     res = data.res;
1633   gst_iterator_free (pads_iter);
1634
1635   /* check for padtemplates that list SOMETIMES pads to
1636    * determine if the element is dynamic. */
1637   *is_dynamic = FALSE;
1638   elemclass = GST_ELEMENT_GET_CLASS (urisrc->source);
1639   walk = gst_element_class_get_pad_template_list (elemclass);
1640   while (walk != NULL) {
1641     GstPadTemplate *templ;
1642
1643     templ = (GstPadTemplate *) walk->data;
1644     if (GST_PAD_TEMPLATE_DIRECTION (templ) == GST_PAD_SRC) {
1645       if (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_SOMETIMES)
1646         *is_dynamic = TRUE;
1647       break;
1648     }
1649     walk = g_list_next (walk);
1650   }
1651
1652   *have_out = data.have_out;
1653
1654   return res;
1655 }
1656
1657 /* Remove any adaptive demuxer element */
1658 static void
1659 remove_demuxer (GstURISourceBin * bin)
1660 {
1661   if (bin->demuxer) {
1662     GST_DEBUG_OBJECT (bin, "removing old demuxer element");
1663     gst_element_set_state (bin->demuxer, GST_STATE_NULL);
1664     gst_bin_remove (GST_BIN_CAST (bin), bin->demuxer);
1665     bin->demuxer = NULL;
1666     bin->demuxer_handles_buffering = FALSE;
1667   }
1668 }
1669
1670 /* make a demuxer and connect to all the signals */
1671 static GstElement *
1672 make_demuxer (GstURISourceBin * urisrc, ChildSrcPadInfo * info, GstCaps * caps)
1673 {
1674   GList *factories, *eligible, *cur;
1675   GstElement *demuxer = NULL;
1676   GParamSpec *pspec;
1677
1678   GST_LOG_OBJECT (urisrc, "making new adaptive demuxer");
1679
1680   /* now create the demuxer element */
1681
1682   /* FIXME: Fire a signal to get the demuxer? */
1683   factories = gst_element_factory_list_get_elements
1684       (GST_ELEMENT_FACTORY_TYPE_DEMUXER, GST_RANK_MARGINAL);
1685   eligible =
1686       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK,
1687       gst_caps_is_fixed (caps));
1688   gst_plugin_feature_list_free (factories);
1689
1690   if (eligible == NULL)
1691     goto no_demuxer;
1692
1693   eligible = g_list_sort (eligible, gst_plugin_feature_rank_compare_func);
1694
1695   for (cur = eligible; cur != NULL; cur = g_list_next (cur)) {
1696     GstElementFactory *factory = (GstElementFactory *) (cur->data);
1697     const gchar *klass =
1698         gst_element_factory_get_metadata (factory, GST_ELEMENT_METADATA_KLASS);
1699
1700     /* Can't be a demuxer unless it has Demux in the klass name */
1701     if (!strstr (klass, "Demux") || !strstr (klass, "Adaptive"))
1702       continue;
1703
1704     demuxer = gst_element_factory_create (factory, NULL);
1705     if (!GST_OBJECT_FLAG_IS_SET (demuxer, GST_BIN_FLAG_STREAMS_AWARE)) {
1706       GST_DEBUG_OBJECT (urisrc, "Ignoring non-streams-aware adaptive demuxer");
1707       gst_object_unref (demuxer);
1708       continue;
1709     }
1710     break;
1711   }
1712   gst_plugin_feature_list_free (eligible);
1713
1714   if (!demuxer)
1715     goto no_demuxer;
1716
1717   GST_DEBUG_OBJECT (urisrc, "Created adaptive demuxer %" GST_PTR_FORMAT,
1718       demuxer);
1719
1720   /* set up callbacks to create the links between
1721    * demuxer streams and output */
1722   g_signal_connect (demuxer,
1723       "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info);
1724   g_signal_connect (demuxer,
1725       "pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info);
1726
1727   /* Propagate connection-speed property */
1728   pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (demuxer),
1729       "connection-speed");
1730   if (pspec != NULL)
1731     g_object_set (demuxer,
1732         "connection-speed", urisrc->connection_speed / 1000, NULL);
1733
1734   return demuxer;
1735
1736   /* ERRORS */
1737 no_demuxer:
1738   {
1739     /* FIXME: Fire the right error */
1740     GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN, (NULL),
1741         ("No demuxer element, check your installation"));
1742     return NULL;
1743   }
1744 }
1745
1746 /* Called when:
1747  * * Source element adds a new pad
1748  * * typefind has found a type
1749  */
1750 static void
1751 handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
1752 {
1753   GstURISourceBin *urisrc = info->urisrc;
1754   gboolean is_raw;
1755   GstStructure *s;
1756   const gchar *media_type;
1757   gboolean do_download = FALSE;
1758
1759   GST_URI_SOURCE_BIN_LOCK (urisrc);
1760
1761   /* if this is a pad with all raw caps, we can expose it */
1762   if (is_all_raw_caps (caps, DEFAULT_CAPS, &is_raw) && is_raw) {
1763     OutputSlotInfo *slot;
1764     GstPad *output_pad;
1765
1766     GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT
1767         ", exposing", caps);
1768     slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
1769     output_pad = gst_object_ref (slot->output_pad);
1770     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1771
1772     expose_output_pad (urisrc, slot->output_pad);
1773     gst_object_unref (output_pad);
1774     return;
1775   }
1776   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1777
1778   s = gst_caps_get_structure (caps, 0);
1779   media_type = gst_structure_get_name (s);
1780
1781   urisrc->is_adaptive = IS_ADAPTIVE_MEDIA (media_type);
1782
1783   if (urisrc->is_adaptive) {
1784     GstPad *sinkpad;
1785     GstPadLinkReturn link_res;
1786     GstQuery *query;
1787
1788     urisrc->demuxer = make_demuxer (urisrc, info, caps);
1789     if (!urisrc->demuxer)
1790       goto no_demuxer;
1791     gst_bin_add (GST_BIN_CAST (urisrc), urisrc->demuxer);
1792
1793     /* Query the demuxer to see if it can handle buffering */
1794     query = gst_query_new_buffering (GST_FORMAT_TIME);
1795     urisrc->demuxer_handles_buffering =
1796         gst_element_query (urisrc->demuxer, query);
1797     gst_query_unref (query);
1798     GST_DEBUG_OBJECT (urisrc, "Demuxer handles buffering : %d",
1799         urisrc->demuxer_handles_buffering);
1800
1801     sinkpad = gst_element_get_static_pad (urisrc->demuxer, "sink");
1802     if (sinkpad == NULL)
1803       goto no_demuxer_sink;
1804
1805     link_res = gst_pad_link (srcpad, sinkpad);
1806
1807     gst_object_unref (sinkpad);
1808     if (link_res != GST_PAD_LINK_OK)
1809       goto could_not_link;
1810
1811     gst_element_sync_state_with_parent (urisrc->demuxer);
1812   } else if (!urisrc->is_stream) {
1813     OutputSlotInfo *slot;
1814     GstPad *output_pad;
1815
1816     /* We don't need buffering here, expose immediately */
1817     GST_URI_SOURCE_BIN_LOCK (urisrc);
1818     slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
1819     output_pad = gst_object_ref (slot->output_pad);
1820     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1821     expose_output_pad (urisrc, output_pad);
1822     gst_object_unref (output_pad);
1823   } else {
1824     OutputSlotInfo *slot;
1825     GstPad *output_pad;
1826
1827     /* only enable download buffering if the upstream duration is known */
1828     if (urisrc->download) {
1829       GstQuery *query = gst_query_new_duration (GST_FORMAT_BYTES);
1830       if (gst_pad_query (srcpad, query)) {
1831         gint64 dur;
1832         gst_query_parse_duration (query, NULL, &dur);
1833         do_download = (dur != -1);
1834       }
1835       gst_query_unref (query);
1836     }
1837
1838     GST_DEBUG_OBJECT (urisrc, "check media-type %s, do_download:%d", media_type,
1839         do_download);
1840
1841     GST_URI_SOURCE_BIN_LOCK (urisrc);
1842     slot = new_output_slot (info, do_download, FALSE, FALSE, srcpad);
1843
1844     gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
1845         pre_queue_event_probe, urisrc, NULL);
1846
1847     output_pad = gst_object_ref (slot->output_pad);
1848     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1849
1850     expose_output_pad (urisrc, output_pad);
1851     gst_object_unref (output_pad);
1852   }
1853
1854   return;
1855
1856   /* ERRORS */
1857 no_demuxer:
1858   {
1859     /* error was posted */
1860     return;
1861   }
1862 no_demuxer_sink:
1863   {
1864     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
1865         (NULL), ("Adaptive demuxer element has no 'sink' pad"));
1866     return;
1867   }
1868 could_not_link:
1869   {
1870     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1871     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
1872         (NULL), ("Can't link typefind to adaptive demuxer element"));
1873     return;
1874   }
1875 }
1876
1877 /* signaled when we have a stream and we need to configure the download
1878  * buffering or regular buffering */
1879 static void
1880 type_found (GstElement * typefind, guint probability,
1881     GstCaps * caps, ChildSrcPadInfo * info)
1882 {
1883   GstURISourceBin *urisrc = info->urisrc;
1884   GstPad *srcpad = gst_element_get_static_pad (typefind, "src");
1885
1886   GST_DEBUG_OBJECT (urisrc, "typefind found caps %" GST_PTR_FORMAT
1887       " on pad %" GST_PTR_FORMAT, caps, srcpad);
1888   handle_new_pad (info, srcpad, caps);
1889
1890   gst_object_unref (GST_OBJECT (srcpad));
1891 }
1892
1893 /* setup typefind for any source. This will first plug a typefind element to the
1894  * source. After we find the type, we decide to whether to plug an adaptive
1895  * demuxer, or just link through queue2 (if needed) and expose the data */
1896 static gboolean
1897 setup_typefind (ChildSrcPadInfo * info)
1898 {
1899   GstURISourceBin *urisrc = info->urisrc;
1900   GstPad *sinkpad;
1901
1902   /* now create the typefind element */
1903   info->typefind = gst_element_factory_make ("typefind", NULL);
1904   if (!info->typefind)
1905     goto no_typefind;
1906
1907   /* Make sure the bin doesn't set the typefind running yet */
1908   gst_element_set_locked_state (info->typefind, TRUE);
1909
1910   gst_bin_add (GST_BIN_CAST (urisrc), info->typefind);
1911
1912   sinkpad = gst_element_get_static_pad (info->typefind, "sink");
1913   if (gst_pad_link (info->src_pad, sinkpad) != GST_PAD_LINK_OK)
1914     goto could_not_link;
1915   gst_object_unref (sinkpad);
1916
1917   /* connect a signal to find out when the typefind element found
1918    * a type */
1919   g_signal_connect (info->typefind, "have-type", G_CALLBACK (type_found), info);
1920
1921   /* Now it can start */
1922   gst_element_set_locked_state (info->typefind, FALSE);
1923   gst_element_sync_state_with_parent (info->typefind);
1924
1925   return TRUE;
1926
1927   /* ERRORS */
1928 no_typefind:
1929   {
1930     post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), "typefind");
1931     GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN, (NULL),
1932         ("No typefind element, check your installation"));
1933     return FALSE;
1934   }
1935 could_not_link:
1936   {
1937     gst_object_unref (sinkpad);
1938     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
1939         (NULL), ("Can't link source to typefind element"));
1940     return FALSE;
1941   }
1942 }
1943
1944 /* CALL WITH URISOURCEBIN LOCK */
1945 static void
1946 free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
1947 {
1948   GST_DEBUG_OBJECT (urisrc,
1949       "removing output slot %" GST_PTR_FORMAT " -> %" GST_PTR_FORMAT,
1950       slot->originating_pad, slot->output_pad);
1951
1952   if (slot->queue) {
1953     if (slot->bitrate_changed_id > 0)
1954       g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id);
1955     slot->bitrate_changed_id = 0;
1956
1957     gst_element_set_locked_state (slot->queue, TRUE);
1958     gst_element_set_state (slot->queue, GST_STATE_NULL);
1959     remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
1960     gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
1961
1962     gst_object_unref (slot->queue_sinkpad);
1963   }
1964
1965   if (slot->demuxer_event_probe_id)
1966     gst_pad_remove_probe (slot->originating_pad, slot->demuxer_event_probe_id);
1967
1968   gst_object_unref (slot->originating_pad);
1969   /* deactivate and remove the srcpad */
1970   gst_pad_set_active (slot->output_pad, FALSE);
1971   gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), slot->output_pad);
1972
1973   g_free (slot);
1974 }
1975
1976 static void
1977 call_free_output_slot (GstURISourceBin * urisrc, OutputSlotInfo * slot)
1978 {
1979   GST_LOG_OBJECT (urisrc, "free output slot in thread pool");
1980   free_output_slot (slot, urisrc);
1981 }
1982
1983 /* must be called with GST_URI_SOURCE_BIN_LOCK */
1984 static void
1985 free_output_slot_async (GstURISourceBin * urisrc, OutputSlotInfo * slot)
1986 {
1987   GST_LOG_OBJECT (urisrc, "pushing output slot on thread pool to free");
1988   slot->linked_info->outputs = g_list_remove (slot->linked_info->outputs, slot);
1989   gst_element_call_async (GST_ELEMENT_CAST (urisrc),
1990       (GstElementCallAsyncFunc) call_free_output_slot, slot, NULL);
1991 }
1992
1993 /* remove source and all related elements */
1994 static void
1995 remove_source (GstURISourceBin * urisrc)
1996 {
1997   if (urisrc->source) {
1998     GstElement *source = urisrc->source;
1999
2000     GST_DEBUG_OBJECT (urisrc, "removing old src element");
2001     gst_element_set_state (source, GST_STATE_NULL);
2002
2003     if (urisrc->src_np_sig_id) {
2004       g_signal_handler_disconnect (source, urisrc->src_np_sig_id);
2005       urisrc->src_np_sig_id = 0;
2006     }
2007     gst_bin_remove (GST_BIN_CAST (urisrc), source);
2008     urisrc->source = NULL;
2009   }
2010
2011   GST_URI_SOURCE_BIN_LOCK (urisrc);
2012   if (urisrc->src_infos) {
2013     g_list_foreach (urisrc->src_infos, (GFunc) free_child_src_pad_info, urisrc);
2014     g_list_free (urisrc->src_infos);
2015     urisrc->src_infos = NULL;
2016   }
2017   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
2018
2019   if (urisrc->demuxer)
2020     remove_demuxer (urisrc);
2021 }
2022
2023 /* is called when a dynamic source element created a new pad. */
2024 static void
2025 source_new_pad (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
2026 {
2027   GstCaps *caps;
2028   ChildSrcPadInfo *info = new_child_src_pad_info (urisrc, pad);
2029
2030   GST_DEBUG_OBJECT (urisrc, "Found new pad %s.%s in source element %s",
2031       GST_DEBUG_PAD_NAME (pad), GST_ELEMENT_NAME (element));
2032
2033   caps = gst_pad_get_current_caps (pad);
2034   GST_DEBUG_OBJECT (urisrc, "caps %" GST_PTR_FORMAT, caps);
2035   if (caps == NULL)
2036     setup_typefind (info);
2037   else {
2038     handle_new_pad (info, pad, caps);
2039     gst_caps_unref (caps);
2040   }
2041 }
2042
2043 /* construct and run the source and demuxer elements until we found
2044  * all the streams or until a preroll queue has been filled.
2045 */
2046 static gboolean
2047 setup_source (GstURISourceBin * urisrc)
2048 {
2049   gboolean have_out, is_dynamic;
2050
2051   GST_DEBUG_OBJECT (urisrc, "setup source");
2052
2053   /* create and configure an element that can handle the uri */
2054   if (!(urisrc->source = gen_source_element (urisrc)))
2055     goto no_source;
2056
2057   /* state will be merged later - if file is not found, error will be
2058    * handled by the application right after. */
2059   gst_bin_add (GST_BIN_CAST (urisrc), urisrc->source);
2060
2061   /* notify of the new source used */
2062   g_object_notify (G_OBJECT (urisrc), "source");
2063
2064   g_signal_emit (urisrc, gst_uri_source_bin_signals[SIGNAL_SOURCE_SETUP],
2065       0, urisrc->source);
2066
2067   /* see if the source element emits raw audio/video all by itself,
2068    * if so, we can create streams for the pads and be done with it.
2069    * Also check that is has source pads, if not, we assume it will
2070    * do everything itself.  */
2071   if (!analyse_source_and_expose_raw_pads (urisrc, &have_out, &is_dynamic))
2072     goto invalid_source;
2073
2074   if (!is_dynamic) {
2075     if (!have_out)
2076       goto no_pads;
2077   } else {
2078     GST_DEBUG_OBJECT (urisrc, "Source has dynamic output pads");
2079     /* connect a handler for the new-pad signal */
2080     urisrc->src_np_sig_id =
2081         g_signal_connect (urisrc->source, "pad-added",
2082         G_CALLBACK (source_new_pad), urisrc);
2083   }
2084
2085   return TRUE;
2086
2087   /* ERRORS */
2088 no_source:
2089   {
2090     /* error message was already posted */
2091     return FALSE;
2092   }
2093 invalid_source:
2094   {
2095     GST_ELEMENT_ERROR (urisrc, CORE, FAILED,
2096         (_("Source element is invalid.")), (NULL));
2097     return FALSE;
2098   }
2099 no_pads:
2100   {
2101     GST_ELEMENT_ERROR (urisrc, CORE, FAILED,
2102         (_("Source element has no pads.")), (NULL));
2103     return FALSE;
2104   }
2105 }
2106
2107 static void
2108 value_list_append_structure_list (GValue * list_val, GstStructure ** first,
2109     GList * structure_list)
2110 {
2111   GList *l;
2112
2113   for (l = structure_list; l != NULL; l = l->next) {
2114     GValue val = { 0, };
2115
2116     if (*first == NULL)
2117       *first = gst_structure_copy ((GstStructure *) l->data);
2118
2119     g_value_init (&val, GST_TYPE_STRUCTURE);
2120     g_value_take_boxed (&val, gst_structure_copy ((GstStructure *) l->data));
2121     gst_value_list_append_value (list_val, &val);
2122     g_value_unset (&val);
2123   }
2124 }
2125
2126 /* if it's a redirect message with multiple redirect locations we might
2127  * want to pick a different 'best' location depending on the required
2128  * bitrates and the connection speed */
2129 static GstMessage *
2130 handle_redirect_message (GstURISourceBin * urisrc, GstMessage * msg)
2131 {
2132   const GValue *locations_list, *location_val;
2133   GstMessage *new_msg;
2134   GstStructure *new_structure = NULL;
2135   GList *l_good = NULL, *l_neutral = NULL, *l_bad = NULL;
2136   GValue new_list = { 0, };
2137   guint size, i;
2138   const GstStructure *structure;
2139
2140   GST_DEBUG_OBJECT (urisrc, "redirect message: %" GST_PTR_FORMAT, msg);
2141   GST_DEBUG_OBJECT (urisrc, "connection speed: %" G_GUINT64_FORMAT,
2142       urisrc->connection_speed);
2143
2144   structure = gst_message_get_structure (msg);
2145   if (urisrc->connection_speed == 0 || structure == NULL)
2146     return msg;
2147
2148   locations_list = gst_structure_get_value (structure, "locations");
2149   if (locations_list == NULL)
2150     return msg;
2151
2152   size = gst_value_list_get_size (locations_list);
2153   if (size < 2)
2154     return msg;
2155
2156   /* maintain existing order as much as possible, just sort references
2157    * with too high a bitrate to the end (the assumption being that if
2158    * bitrates are given they are given for all interesting streams and
2159    * that the you-need-at-least-version-xyz redirect has the same bitrate
2160    * as the lowest referenced redirect alternative) */
2161   for (i = 0; i < size; ++i) {
2162     const GstStructure *s;
2163     gint bitrate = 0;
2164
2165     location_val = gst_value_list_get_value (locations_list, i);
2166     s = (const GstStructure *) g_value_get_boxed (location_val);
2167     if (!gst_structure_get_int (s, "minimum-bitrate", &bitrate) || bitrate <= 0) {
2168       GST_DEBUG_OBJECT (urisrc, "no bitrate: %" GST_PTR_FORMAT, s);
2169       l_neutral = g_list_append (l_neutral, (gpointer) s);
2170     } else if (bitrate > urisrc->connection_speed) {
2171       GST_DEBUG_OBJECT (urisrc, "bitrate too high: %" GST_PTR_FORMAT, s);
2172       l_bad = g_list_append (l_bad, (gpointer) s);
2173     } else if (bitrate <= urisrc->connection_speed) {
2174       GST_DEBUG_OBJECT (urisrc, "bitrate OK: %" GST_PTR_FORMAT, s);
2175       l_good = g_list_append (l_good, (gpointer) s);
2176     }
2177   }
2178
2179   g_value_init (&new_list, GST_TYPE_LIST);
2180   value_list_append_structure_list (&new_list, &new_structure, l_good);
2181   value_list_append_structure_list (&new_list, &new_structure, l_neutral);
2182   value_list_append_structure_list (&new_list, &new_structure, l_bad);
2183   gst_structure_take_value (new_structure, "locations", &new_list);
2184
2185   g_list_free (l_good);
2186   g_list_free (l_neutral);
2187   g_list_free (l_bad);
2188
2189   new_msg = gst_message_new_element (msg->src, new_structure);
2190   gst_message_unref (msg);
2191
2192   GST_DEBUG_OBJECT (urisrc, "new redirect message: %" GST_PTR_FORMAT, new_msg);
2193   return new_msg;
2194 }
2195
2196 /* CALL WITH URISOURCEBIN LOCK */
2197 static OutputSlotInfo *
2198 output_slot_for_buffering_element (GstURISourceBin * urisrc,
2199     GstElement * element)
2200 {
2201   GList *top, *iter;
2202   for (top = urisrc->src_infos; top; top = top->next) {
2203     ChildSrcPadInfo *info = top->data;
2204     for (iter = info->outputs; iter; iter = iter->next) {
2205       OutputSlotInfo *slot = iter->data;
2206       if (slot->queue == element)
2207         return slot;
2208     }
2209   }
2210
2211   return NULL;
2212 }
2213
2214 static void
2215 handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
2216 {
2217   gint perc, msg_perc;
2218   gint smaller_perc = 100;
2219   GstMessage *smaller = NULL;
2220   GList *found = NULL;
2221   GList *iter;
2222   OutputSlotInfo *slot;
2223
2224   /* buffering messages must be aggregated as there might be multiple buffering
2225    * elements in the pipeline and their independent buffering messages will
2226    * confuse the application
2227    *
2228    * urisourcebin keeps a list of messages received from elements that are
2229    * buffering.
2230    * Rules are:
2231    * 0) Ignore buffering from elements that are draining (is_eos == TRUE)
2232    * 1) Always post the smaller buffering %
2233    * 2) If an element posts a 100% buffering message, remove it from the list
2234    * 3) When there are no more messages on the list, post 100% message
2235    * 4) When an element posts a new buffering message, update the one
2236    *    on the list to this new value
2237    */
2238   gst_message_parse_buffering (msg, &msg_perc);
2239   GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT
2240       " with %d%%", GST_MESSAGE_SRC (msg), msg_perc);
2241
2242   BUFFERING_LOCK (urisrc);
2243   slot =
2244       output_slot_for_buffering_element (urisrc,
2245       (GstElement *) GST_MESSAGE_SRC (msg));
2246   if (slot && slot->is_eos) {
2247     /* Ignore buffering messages from queues we marked as EOS,
2248      * we already removed those from the list of buffering
2249      * objects */
2250     BUFFERING_UNLOCK (urisrc);
2251     gst_message_replace (&msg, NULL);
2252     return;
2253   }
2254
2255
2256   g_mutex_lock (&urisrc->buffering_post_lock);
2257
2258   /*
2259    * Single loop for 2 things:
2260    * 1) Look for a message with the same source
2261    *   1.1) If the received message is 100%, remove it from the list
2262    * 2) Find the minimum buffering from the list from elements that aren't EOS
2263    */
2264   for (iter = urisrc->buffering_status; iter;) {
2265     GstMessage *bufstats = iter->data;
2266     gboolean is_eos = FALSE;
2267
2268     slot =
2269         output_slot_for_buffering_element (urisrc,
2270         (GstElement *) GST_MESSAGE_SRC (msg));
2271     if (slot)
2272       is_eos = slot->is_eos;
2273
2274     if (GST_MESSAGE_SRC (bufstats) == GST_MESSAGE_SRC (msg)) {
2275       found = iter;
2276       if (msg_perc < 100) {
2277         gst_message_unref (iter->data);
2278         bufstats = iter->data = gst_message_ref (msg);
2279       } else {
2280         GList *current = iter;
2281
2282         /* remove the element here and avoid confusing the loop */
2283         iter = g_list_next (iter);
2284
2285         gst_message_unref (current->data);
2286         urisrc->buffering_status =
2287             g_list_delete_link (urisrc->buffering_status, current);
2288
2289         continue;
2290       }
2291     }
2292
2293     /* only update minimum stat for non-EOS slots */
2294     if (!is_eos) {
2295       gst_message_parse_buffering (bufstats, &perc);
2296       if (perc < smaller_perc) {
2297         smaller_perc = perc;
2298         smaller = bufstats;
2299       }
2300     } else {
2301       GST_LOG_OBJECT (urisrc, "Ignoring buffering from EOS element");
2302     }
2303     iter = g_list_next (iter);
2304   }
2305
2306   if (found == NULL && msg_perc < 100) {
2307     if (msg_perc < smaller_perc) {
2308       smaller_perc = msg_perc;
2309       smaller = msg;
2310     }
2311     urisrc->buffering_status =
2312         g_list_prepend (urisrc->buffering_status, gst_message_ref (msg));
2313   }
2314
2315   if (smaller_perc == urisrc->last_buffering_pct) {
2316     /* Don't repeat our last buffering status */
2317     gst_message_replace (&msg, NULL);
2318   } else {
2319     urisrc->last_buffering_pct = smaller_perc;
2320
2321     /* now compute the buffering message that should be posted */
2322     if (smaller_perc == 100) {
2323       g_assert (urisrc->buffering_status == NULL);
2324       /* we are posting the original received msg */
2325     } else {
2326       gst_message_replace (&msg, smaller);
2327     }
2328   }
2329   BUFFERING_UNLOCK (urisrc);
2330
2331   if (msg) {
2332     GST_LOG_OBJECT (urisrc, "Sending buffering msg from %" GST_PTR_FORMAT
2333         " with %d%%", GST_MESSAGE_SRC (msg), smaller_perc);
2334     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN (urisrc), msg);
2335   } else {
2336     GST_LOG_OBJECT (urisrc, "Dropped buffering msg as a repeat of %d%%",
2337         smaller_perc);
2338   }
2339   g_mutex_unlock (&urisrc->buffering_post_lock);
2340 }
2341
2342 /* Remove any buffering message from the given source */
2343 static void
2344 remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src)
2345 {
2346   GList *iter;
2347   gboolean removed = FALSE, post;
2348
2349   BUFFERING_LOCK (urisrc);
2350   g_mutex_lock (&urisrc->buffering_post_lock);
2351
2352   GST_DEBUG_OBJECT (urisrc, "Removing %" GST_PTR_FORMAT
2353       " buffering messages", src);
2354
2355   for (iter = urisrc->buffering_status; iter;) {
2356     GstMessage *bufstats = iter->data;
2357     if (GST_MESSAGE_SRC (bufstats) == src) {
2358       gst_message_unref (bufstats);
2359       urisrc->buffering_status =
2360           g_list_delete_link (urisrc->buffering_status, iter);
2361       removed = TRUE;
2362       break;
2363     }
2364     iter = g_list_next (iter);
2365   }
2366
2367   post = (removed && urisrc->buffering_status == NULL);
2368   BUFFERING_UNLOCK (urisrc);
2369
2370   if (post) {
2371     GST_DEBUG_OBJECT (urisrc, "Last buffering element done - posting 100%%");
2372
2373     /* removed the last buffering element, post 100% */
2374     gst_element_post_message (GST_ELEMENT_CAST (urisrc),
2375         gst_message_new_buffering (GST_OBJECT_CAST (urisrc), 100));
2376   }
2377
2378   g_mutex_unlock (&urisrc->buffering_post_lock);
2379 }
2380
2381 static void
2382 handle_message (GstBin * bin, GstMessage * msg)
2383 {
2384   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (bin);
2385
2386   switch (GST_MESSAGE_TYPE (msg)) {
2387     case GST_MESSAGE_ELEMENT:{
2388       if (gst_message_has_name (msg, "redirect")) {
2389         /* sort redirect messages based on the connection speed. This simplifies
2390          * the user of this element as it can in most cases just pick the first item
2391          * of the sorted list as a good redirection candidate. It can of course
2392          * choose something else from the list if it has a better way. */
2393         msg = handle_redirect_message (urisrc, msg);
2394       }
2395       break;
2396     }
2397     case GST_MESSAGE_BUFFERING:
2398       handle_buffering_message (urisrc, msg);
2399       msg = NULL;
2400       break;
2401     default:
2402       break;
2403   }
2404
2405   if (msg)
2406     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
2407 }
2408
2409 /* generic struct passed to all query fold methods
2410  * FIXME, move to core.
2411  */
2412 typedef struct
2413 {
2414   GstQuery *query;
2415   gint64 min;
2416   gint64 max;
2417   gboolean seekable;
2418   gboolean live;
2419 } QueryFold;
2420
2421 typedef void (*QueryInitFunction) (GstURISourceBin * urisrc, QueryFold * fold);
2422 typedef void (*QueryDoneFunction) (GstURISourceBin * urisrc, QueryFold * fold);
2423
2424 /* for duration/position we collect all durations/positions and take
2425  * the MAX of all valid results */
2426 static void
2427 uri_source_query_init (GstURISourceBin * urisrc, QueryFold * fold)
2428 {
2429   fold->min = 0;
2430   fold->max = -1;
2431   fold->seekable = TRUE;
2432   fold->live = 0;
2433 }
2434
2435 static gboolean
2436 uri_source_query_duration_fold (const GValue * item, GValue * ret,
2437     QueryFold * fold)
2438 {
2439   GstPad *pad = g_value_get_object (item);
2440
2441   if (gst_pad_query (pad, fold->query)) {
2442     gint64 duration;
2443
2444     g_value_set_boolean (ret, TRUE);
2445
2446     gst_query_parse_duration (fold->query, NULL, &duration);
2447
2448     GST_DEBUG_OBJECT (item, "got duration %" G_GINT64_FORMAT, duration);
2449
2450     if (duration > fold->max)
2451       fold->max = duration;
2452   }
2453   return TRUE;
2454 }
2455
2456 static void
2457 uri_source_query_duration_done (GstURISourceBin * urisrc, QueryFold * fold)
2458 {
2459   GstFormat format;
2460
2461   gst_query_parse_duration (fold->query, &format, NULL);
2462   /* store max in query result */
2463   gst_query_set_duration (fold->query, format, fold->max);
2464
2465   GST_DEBUG ("max duration %" G_GINT64_FORMAT, fold->max);
2466 }
2467
2468 static gboolean
2469 uri_source_query_position_fold (const GValue * item, GValue * ret,
2470     QueryFold * fold)
2471 {
2472   GstPad *pad = g_value_get_object (item);
2473
2474   if (gst_pad_query (pad, fold->query)) {
2475     gint64 position;
2476
2477     g_value_set_boolean (ret, TRUE);
2478
2479     gst_query_parse_position (fold->query, NULL, &position);
2480
2481     GST_DEBUG_OBJECT (item, "got position %" G_GINT64_FORMAT, position);
2482
2483     if (position > fold->max)
2484       fold->max = position;
2485   }
2486
2487   return TRUE;
2488 }
2489
2490 static void
2491 uri_source_query_position_done (GstURISourceBin * urisrc, QueryFold * fold)
2492 {
2493   GstFormat format;
2494
2495   gst_query_parse_position (fold->query, &format, NULL);
2496   /* store max in query result */
2497   gst_query_set_position (fold->query, format, fold->max);
2498
2499   GST_DEBUG_OBJECT (urisrc, "max position %" G_GINT64_FORMAT, fold->max);
2500 }
2501
2502 static gboolean
2503 uri_source_query_latency_fold (const GValue * item, GValue * ret,
2504     QueryFold * fold)
2505 {
2506   GstPad *pad = g_value_get_object (item);
2507
2508   if (gst_pad_query (pad, fold->query)) {
2509     GstClockTime min, max;
2510     gboolean live;
2511
2512     gst_query_parse_latency (fold->query, &live, &min, &max);
2513
2514     GST_DEBUG_OBJECT (pad,
2515         "got latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
2516         ", live %d", GST_TIME_ARGS (min), GST_TIME_ARGS (max), live);
2517
2518     if (live) {
2519       /* for the combined latency we collect the MAX of all min latencies and
2520        * the MIN of all max latencies */
2521       if (min > fold->min)
2522         fold->min = min;
2523       if (fold->max == -1)
2524         fold->max = max;
2525       else if (max < fold->max)
2526         fold->max = max;
2527
2528       fold->live = TRUE;
2529     }
2530   } else {
2531     GST_LOG_OBJECT (pad, "latency query failed");
2532     g_value_set_boolean (ret, FALSE);
2533   }
2534
2535   return TRUE;
2536 }
2537
2538 static void
2539 uri_source_query_latency_done (GstURISourceBin * urisrc, QueryFold * fold)
2540 {
2541   /* store max in query result */
2542   gst_query_set_latency (fold->query, fold->live, fold->min, fold->max);
2543
2544   GST_DEBUG_OBJECT (urisrc,
2545       "latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
2546       ", live %d", GST_TIME_ARGS (fold->min), GST_TIME_ARGS (fold->max),
2547       fold->live);
2548 }
2549
2550 /* we are seekable if all srcpads are seekable */
2551 static gboolean
2552 uri_source_query_seeking_fold (const GValue * item, GValue * ret,
2553     QueryFold * fold)
2554 {
2555   GstPad *pad = g_value_get_object (item);
2556
2557   if (gst_pad_query (pad, fold->query)) {
2558     gboolean seekable;
2559
2560     g_value_set_boolean (ret, TRUE);
2561     gst_query_parse_seeking (fold->query, NULL, &seekable, NULL, NULL);
2562
2563     GST_DEBUG_OBJECT (item, "got seekable %d", seekable);
2564
2565     if (fold->seekable)
2566       fold->seekable = seekable;
2567   }
2568
2569   return TRUE;
2570 }
2571
2572 static void
2573 uri_source_query_seeking_done (GstURISourceBin * urisrc, QueryFold * fold)
2574 {
2575   GstFormat format;
2576
2577   gst_query_parse_seeking (fold->query, &format, NULL, NULL, NULL);
2578   gst_query_set_seeking (fold->query, format, fold->seekable, 0, -1);
2579
2580   GST_DEBUG_OBJECT (urisrc, "seekable %d", fold->seekable);
2581 }
2582
2583 /* generic fold, return first valid result */
2584 static gboolean
2585 uri_source_query_generic_fold (const GValue * item, GValue * ret,
2586     QueryFold * fold)
2587 {
2588   GstPad *pad = g_value_get_object (item);
2589   gboolean res;
2590
2591   if ((res = gst_pad_query (pad, fold->query))) {
2592     g_value_set_boolean (ret, TRUE);
2593     GST_DEBUG_OBJECT (item, "answered query %p", fold->query);
2594   }
2595
2596   /* and stop as soon as we have a valid result */
2597   return !res;
2598 }
2599
2600 /* we're a bin, the default query handler iterates sink elements, which we don't
2601  * have normally. We should just query all source pads.
2602  */
2603 static gboolean
2604 gst_uri_source_bin_query (GstElement * element, GstQuery * query)
2605 {
2606   GstURISourceBin *urisrc;
2607   gboolean res = FALSE;
2608   GstIterator *iter;
2609   GstIteratorFoldFunction fold_func;
2610   QueryInitFunction fold_init = NULL;
2611   QueryDoneFunction fold_done = NULL;
2612   QueryFold fold_data;
2613   GValue ret = { 0 };
2614   gboolean default_ret = FALSE;
2615
2616   urisrc = GST_URI_SOURCE_BIN (element);
2617
2618   switch (GST_QUERY_TYPE (query)) {
2619     case GST_QUERY_DURATION:
2620       /* iterate and collect durations */
2621       fold_func = (GstIteratorFoldFunction) uri_source_query_duration_fold;
2622       fold_init = uri_source_query_init;
2623       fold_done = uri_source_query_duration_done;
2624       break;
2625     case GST_QUERY_POSITION:
2626       /* iterate and collect durations */
2627       fold_func = (GstIteratorFoldFunction) uri_source_query_position_fold;
2628       fold_init = uri_source_query_init;
2629       fold_done = uri_source_query_position_done;
2630       break;
2631     case GST_QUERY_LATENCY:
2632       /* iterate and collect durations */
2633       fold_func = (GstIteratorFoldFunction) uri_source_query_latency_fold;
2634       fold_init = uri_source_query_init;
2635       fold_done = uri_source_query_latency_done;
2636       default_ret = TRUE;
2637       break;
2638     case GST_QUERY_SEEKING:
2639       /* iterate and collect durations */
2640       fold_func = (GstIteratorFoldFunction) uri_source_query_seeking_fold;
2641       fold_init = uri_source_query_init;
2642       fold_done = uri_source_query_seeking_done;
2643       break;
2644     default:
2645       fold_func = (GstIteratorFoldFunction) uri_source_query_generic_fold;
2646       break;
2647   }
2648
2649   fold_data.query = query;
2650
2651   g_value_init (&ret, G_TYPE_BOOLEAN);
2652   g_value_set_boolean (&ret, default_ret);
2653
2654   iter = gst_element_iterate_src_pads (element);
2655   GST_DEBUG_OBJECT (element, "Sending query %p (type %d) to src pads",
2656       query, GST_QUERY_TYPE (query));
2657
2658   if (fold_init)
2659     fold_init (urisrc, &fold_data);
2660
2661   while (TRUE) {
2662     GstIteratorResult ires;
2663
2664     ires = gst_iterator_fold (iter, fold_func, &ret, &fold_data);
2665
2666     switch (ires) {
2667       case GST_ITERATOR_RESYNC:
2668         gst_iterator_resync (iter);
2669         if (fold_init)
2670           fold_init (urisrc, &fold_data);
2671         g_value_set_boolean (&ret, default_ret);
2672         break;
2673       case GST_ITERATOR_OK:
2674       case GST_ITERATOR_DONE:
2675         res = g_value_get_boolean (&ret);
2676         if (fold_done != NULL && res)
2677           fold_done (urisrc, &fold_data);
2678         goto done;
2679       default:
2680         res = FALSE;
2681         goto done;
2682     }
2683   }
2684 done:
2685   gst_iterator_free (iter);
2686
2687   return res;
2688 }
2689
2690 static GstStateChangeReturn
2691 gst_uri_source_bin_change_state (GstElement * element,
2692     GstStateChange transition)
2693 {
2694   GstStateChangeReturn ret;
2695   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (element);
2696
2697   switch (transition) {
2698     case GST_STATE_CHANGE_READY_TO_PAUSED:
2699       GST_URI_SOURCE_BIN_LOCK (element);
2700       urisrc->flushing = FALSE;
2701       urisrc->activated = FALSE;
2702       GST_URI_SOURCE_BIN_UNLOCK (element);
2703       GST_DEBUG ("ready to paused");
2704       if (!setup_source (urisrc))
2705         goto source_failed;
2706       break;
2707     case GST_STATE_CHANGE_PAUSED_TO_READY:
2708       GST_URI_SOURCE_BIN_LOCK (element);
2709       urisrc->flushing = TRUE;
2710       g_cond_broadcast (&urisrc->activation_cond);
2711       GST_URI_SOURCE_BIN_UNLOCK (element);
2712     default:
2713       break;
2714   }
2715
2716   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2717   if (ret == GST_STATE_CHANGE_FAILURE)
2718     goto setup_failed;
2719
2720   switch (transition) {
2721     case GST_STATE_CHANGE_READY_TO_PAUSED:
2722     {
2723       GST_URI_SOURCE_BIN_LOCK (element);
2724       GST_DEBUG_OBJECT (urisrc, "Potentially exposing pads");
2725       urisrc->activated = TRUE;
2726       g_cond_broadcast (&urisrc->activation_cond);
2727       GST_URI_SOURCE_BIN_UNLOCK (element);
2728     }
2729       break;
2730     case GST_STATE_CHANGE_PAUSED_TO_READY:
2731       GST_DEBUG ("paused to ready");
2732       remove_source (urisrc);
2733       g_list_free_full (urisrc->buffering_status,
2734           (GDestroyNotify) gst_message_unref);
2735       urisrc->buffering_status = NULL;
2736       urisrc->last_buffering_pct = -1;
2737       break;
2738     case GST_STATE_CHANGE_READY_TO_NULL:
2739       GST_DEBUG ("ready to null");
2740       remove_source (urisrc);
2741       break;
2742     default:
2743       break;
2744   }
2745   return ret;
2746
2747   /* ERRORS */
2748 source_failed:
2749   {
2750     return GST_STATE_CHANGE_FAILURE;
2751   }
2752 setup_failed:
2753   {
2754     /* clean up leftover groups */
2755     return GST_STATE_CHANGE_FAILURE;
2756   }
2757 }