urisourcebin: Use iterator function where applicable
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst / playback / gsturisourcebin.c
1 /* GStreamer
2  * Copyright (C) <2015> Jan Schmidt <jan@centricular.com>
3  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * SECTION:element-urisourcebin
23  * @title: urisourcebin
24  *
25  * urisourcebin is an element for accessing URIs in a uniform manner.
26  *
27  * It handles selecting a URI source element and potentially download
28  * buffering for network sources. It produces one or more source pads,
29  * depending on the input source, for feeding to decoding chains or decodebin.
30  *
31  * The main configuration is via the #GstURISourceBin:uri property.
32  *
33  * > urisourcebin is still experimental API and a technology preview.
34  * > Its behaviour and exposed API is subject to change.
35  */
36
37 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
38  * with newer GLib versions (>= 2.31.0) */
39 #define GLIB_DISABLE_DEPRECATION_WARNINGS
40
41 #ifdef HAVE_CONFIG_H
42 #  include "config.h"
43 #endif
44
45 #include <string.h>
46
47 #include <gst/gst.h>
48 #include <glib/gi18n-lib.h>
49 #include <gst/pbutils/missing-plugins.h>
50
51 #include "gstplay-enum.h"
52 #include "gstrawcaps.h"
53 #include "gstplaybackelements.h"
54 #include "gstplaybackutils.h"
55
56 #define GST_TYPE_URI_SOURCE_BIN \
57   (gst_uri_source_bin_get_type())
58 #define GST_URI_SOURCE_BIN(obj) \
59   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_URI_SOURCE_BIN,GstURISourceBin))
60 #define GST_URI_SOURCE_BIN_CLASS(klass) \
61   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_URI_SOURCE_BIN,GstURISourceBinClass))
62 #define GST_IS_URI_SOURCE_BIN(obj) \
63   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_URI_SOURCE_BIN))
64 #define GST_IS_URI_SOURCE_BIN_CLASS(klass) \
65   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_URI_SOURCE_BIN))
66 #define GST_URI_SOURCE_BIN_CAST(obj) ((GstURISourceBin *) (obj))
67
68 typedef struct _GstURISourceBin GstURISourceBin;
69 typedef struct _GstURISourceBinClass GstURISourceBinClass;
70 typedef struct _ChildSrcPadInfo ChildSrcPadInfo;
71 typedef struct _OutputSlotInfo OutputSlotInfo;
72
73 #define GST_URI_SOURCE_BIN_LOCK(urisrc) (g_mutex_lock(&((GstURISourceBin*)(urisrc))->lock))
74 #define GST_URI_SOURCE_BIN_UNLOCK(urisrc) (g_mutex_unlock(&((GstURISourceBin*)(urisrc))->lock))
75
76 #define BUFFERING_LOCK(ubin) G_STMT_START {                             \
77     GST_LOG_OBJECT (ubin,                                               \
78                     "buffering locking from thread %p",                 \
79                     g_thread_self ());                                  \
80     g_mutex_lock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock);              \
81     GST_LOG_OBJECT (ubin,                                               \
82                     "buffering lock from thread %p",                    \
83                     g_thread_self ());                                  \
84 } G_STMT_END
85
86 #define BUFFERING_UNLOCK(ubin) G_STMT_START {                           \
87     GST_LOG_OBJECT (ubin,                                               \
88                     "buffering unlocking from thread %p",               \
89                     g_thread_self ());                                  \
90     g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock);            \
91 } G_STMT_END
92
93 /* Track a source pad from a child (source, typefind, demuxer) that is linked or
94  * needs linking to an output slot, or source pads that are directly exposed as
95  * ghost pads */
96 struct _ChildSrcPadInfo
97 {
98   /* Source pad this info is attached to (not reffed, since the pad owns the
99    * ChildSrcPadInfo as qdata) */
100   GstPad *src_pad;
101
102   /* The output GhostPad if this info is for a directly exposed pad (raw or
103    * not), rather than linked through a slot. */
104   GstPad *output_pad;
105
106   /* Configured output slot, if any buffering/download is required */
107   OutputSlotInfo *output_slot;
108
109   /* ADAPTIVE DEMUXER SPECIFIC */
110   guint blocking_probe_id;
111   guint event_probe_id;
112
113   /* Current caps of the adaptive demuxer source pad. Used to link new pending
114    * pads to a compatible (old) output slot.
115    *
116    * NOTE : This will eventually go away once only streams-aware elements are
117    * allowed within urisourcebin
118    */
119   GstCaps *cur_caps;
120
121 };
122
123 /* Output Slot:
124  *
125  * Buffered output
126  */
127 struct _OutputSlotInfo
128 {
129   ChildSrcPadInfo *linked_info; /* demux source pad info feeding this slot, if any */
130   GstElement *queue;            /* queue2 or downloadbuffer */
131   GstPad *queue_sinkpad;        /* Sink pad of the queue eleemnt */
132   GstPad *output_pad;           /* Output ghost pad */
133   gboolean is_eos;              /* Did EOS get fed into the buffering element */
134
135   gulong bitrate_changed_id;    /* queue bitrate changed notification */
136 };
137
138 /**
139  * GstURISourceBin
140  *
141  * urisourcebin element struct
142  */
143 struct _GstURISourceBin
144 {
145   GstBin parent_instance;
146
147   GMutex lock;                  /* lock for constructing */
148
149   gchar *uri;
150   guint64 connection_speed;
151
152   gboolean is_stream;
153   gboolean is_adaptive;
154   gboolean demuxer_handles_buffering;   /* If TRUE: Don't use buffering elements */
155   gboolean source_streams_aware;        /* if TRUE: Don't block output pads */
156   guint64 buffer_duration;      /* When buffering, buffer duration (ns) */
157   guint buffer_size;            /* When buffering, buffer size (bytes) */
158   gboolean download;
159   gboolean use_buffering;
160   gdouble low_watermark;
161   gdouble high_watermark;
162
163   GstElement *source;
164   GList *typefinds;             /* list of typefind element */
165
166   GstElement *demuxer;          /* Adaptive demuxer if any */
167   GSList *out_slots;
168
169   guint numpads;
170
171   /* for dynamic sources */
172   guint src_np_sig_id;          /* new-pad signal id */
173
174   guint64 ring_buffer_max_size; /* 0 means disabled */
175
176   GList *pending_pads;          /* Pads we have blocked pending assignment
177                                    to an output source pad */
178
179   GList *buffering_status;      /* element currently buffering messages */
180   gint last_buffering_pct;      /* Avoid sending buffering over and over */
181   GMutex buffering_lock;
182   GMutex buffering_post_lock;
183 };
184
185 struct _GstURISourceBinClass
186 {
187   GstBinClass parent_class;
188
189   /* emitted when all data has been drained out
190    * FIXME : What do we need this for ?? */
191   void (*drained) (GstElement * element);
192   /* emitted when all data has been fed into buffering slots (i.e the
193    * actual sources are done) */
194   void (*about_to_finish) (GstElement * element);
195 };
196
197 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
198     GST_PAD_SRC,
199     GST_PAD_SOMETIMES,
200     GST_STATIC_CAPS_ANY);
201
202 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
203
204 GST_DEBUG_CATEGORY_STATIC (gst_uri_source_bin_debug);
205 #define GST_CAT_DEFAULT gst_uri_source_bin_debug
206
207 /* signals */
208 enum
209 {
210   SIGNAL_DRAINED,
211   SIGNAL_ABOUT_TO_FINISH,
212   SIGNAL_SOURCE_SETUP,
213   LAST_SIGNAL
214 };
215
216 /* properties */
217 #define DEFAULT_PROP_URI            NULL
218 #define DEFAULT_PROP_SOURCE         NULL
219 #define DEFAULT_CONNECTION_SPEED    0
220 #define DEFAULT_BUFFER_DURATION     -1
221 #define DEFAULT_BUFFER_SIZE         -1
222 #define DEFAULT_DOWNLOAD            FALSE
223 #define DEFAULT_USE_BUFFERING       TRUE
224 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
225 #define DEFAULT_LOW_WATERMARK       0.01
226 #define DEFAULT_HIGH_WATERMARK      0.99
227
228 #define ACTUAL_DEFAULT_BUFFER_SIZE  10 * 1024 * 1024    /* The value used for byte limits when buffer-size == -1 */
229 #define ACTUAL_DEFAULT_BUFFER_DURATION  5 * GST_SECOND  /* The value used for time limits when buffer-duration == -1 */
230
231 #define GET_BUFFER_SIZE(u) ((u)->buffer_size == -1 ? ACTUAL_DEFAULT_BUFFER_SIZE : (u)->buffer_size)
232 #define GET_BUFFER_DURATION(u) ((u)->buffer_duration == -1 ? ACTUAL_DEFAULT_BUFFER_DURATION : (u)->buffer_duration)
233
234 #define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
235 enum
236 {
237   PROP_0,
238   PROP_URI,
239   PROP_SOURCE,
240   PROP_CONNECTION_SPEED,
241   PROP_BUFFER_SIZE,
242   PROP_BUFFER_DURATION,
243   PROP_DOWNLOAD,
244   PROP_USE_BUFFERING,
245   PROP_RING_BUFFER_MAX_SIZE,
246   PROP_LOW_WATERMARK,
247   PROP_HIGH_WATERMARK,
248   PROP_STATISTICS,
249 };
250
251 #define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
252 #define CUSTOM_EOS_QUARK_DATA "custom-eos"
253 static GQuark
254 _custom_eos_quark_get (void)
255 {
256   static gsize g_quark;
257
258   if (g_once_init_enter (&g_quark)) {
259     gsize quark =
260         (gsize) g_quark_from_static_string ("urisourcebin-custom-eos");
261     g_once_init_leave (&g_quark, quark);
262   }
263   return g_quark;
264 }
265
266 static void post_missing_plugin_error (GstElement * urisrc,
267     const gchar * element_name);
268
269 static guint gst_uri_source_bin_signals[LAST_SIGNAL] = { 0 };
270
271 GType gst_uri_source_bin_get_type (void);
272 #define gst_uri_source_bin_parent_class parent_class
273 G_DEFINE_TYPE (GstURISourceBin, gst_uri_source_bin, GST_TYPE_BIN);
274
275 #define _do_init \
276     GST_DEBUG_CATEGORY_INIT (gst_uri_source_bin_debug, "urisourcebin", 0, "URI source element"); \
277     playback_element_init (plugin);
278 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (urisourcebin, "urisourcebin",
279     GST_RANK_NONE, GST_TYPE_URI_SOURCE_BIN, _do_init);
280
281 static void gst_uri_source_bin_set_property (GObject * object, guint prop_id,
282     const GValue * value, GParamSpec * pspec);
283 static void gst_uri_source_bin_get_property (GObject * object, guint prop_id,
284     GValue * value, GParamSpec * pspec);
285 static void gst_uri_source_bin_finalize (GObject * obj);
286
287 static void handle_message (GstBin * bin, GstMessage * msg);
288
289 static gboolean gst_uri_source_bin_query (GstElement * element,
290     GstQuery * query);
291 static GstStateChangeReturn gst_uri_source_bin_change_state (GstElement *
292     element, GstStateChange transition);
293
294 static void remove_demuxer (GstURISourceBin * bin);
295 static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
296 static OutputSlotInfo *get_output_slot (GstURISourceBin * urisrc,
297     gboolean do_download, gboolean is_adaptive, GstCaps * caps);
298 static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
299 static void free_output_slot_async (GstURISourceBin * urisrc,
300     OutputSlotInfo * slot);
301 static GstPad *create_output_pad (GstURISourceBin * urisrc, GstPad * pad);
302 static void remove_buffering_msgs (GstURISourceBin * bin, GstObject * src);
303
304 static void update_queue_values (GstURISourceBin * urisrc);
305 static GstStructure *get_queue_statistics (GstURISourceBin * urisrc);
306
307 static void
308 gst_uri_source_bin_class_init (GstURISourceBinClass * klass)
309 {
310   GObjectClass *gobject_class;
311   GstElementClass *gstelement_class;
312   GstBinClass *gstbin_class;
313
314   gobject_class = G_OBJECT_CLASS (klass);
315   gstelement_class = GST_ELEMENT_CLASS (klass);
316   gstbin_class = GST_BIN_CLASS (klass);
317
318   gobject_class->set_property = gst_uri_source_bin_set_property;
319   gobject_class->get_property = gst_uri_source_bin_get_property;
320   gobject_class->finalize = gst_uri_source_bin_finalize;
321
322   g_object_class_install_property (gobject_class, PROP_URI,
323       g_param_spec_string ("uri", "URI", "URI to decode",
324           DEFAULT_PROP_URI, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
325
326   g_object_class_install_property (gobject_class, PROP_SOURCE,
327       g_param_spec_object ("source", "Source", "Source object used",
328           GST_TYPE_ELEMENT, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
329
330   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
331       g_param_spec_uint64 ("connection-speed", "Connection Speed",
332           "Network connection speed in kbps (0 = unknown)",
333           0, G_MAXUINT64 / 1000, DEFAULT_CONNECTION_SPEED,
334           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
335
336   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
337       g_param_spec_int ("buffer-size", "Buffer size (bytes)",
338           "Buffer size when buffering streams (-1 default value)",
339           -1, G_MAXINT, DEFAULT_BUFFER_SIZE,
340           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
341   g_object_class_install_property (gobject_class, PROP_BUFFER_DURATION,
342       g_param_spec_int64 ("buffer-duration", "Buffer duration (ns)",
343           "Buffer duration when buffering streams (-1 default value)",
344           -1, G_MAXINT64, DEFAULT_BUFFER_DURATION,
345           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
346
347   /**
348    * GstURISourceBin::download:
349    *
350    * For certain media type, enable download buffering.
351    */
352   g_object_class_install_property (gobject_class, PROP_DOWNLOAD,
353       g_param_spec_boolean ("download", "Download",
354           "Attempt download buffering when buffering network streams",
355           DEFAULT_DOWNLOAD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357   /**
358    * GstURISourceBin::use-buffering:
359    *
360    * Perform buffering using a queue2 element, and emit BUFFERING
361    * messages based on low-/high-percent thresholds of streaming data,
362    * such as adaptive-demuxer streams.
363    *
364    * When download buffering is activated and used for the current media
365    * type, this property does nothing.
366    *
367    */
368   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
369       g_param_spec_boolean ("use-buffering", "Use Buffering",
370           "Perform buffering on demuxed/parsed media",
371           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372
373   /**
374    * GstURISourceBin::ring-buffer-max-size
375    *
376    * The maximum size of the ring buffer in kilobytes. If set to 0, the ring
377    * buffer is disabled. Default is 0.
378    *
379    */
380   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
381       g_param_spec_uint64 ("ring-buffer-max-size",
382           "Max. ring buffer size (bytes)",
383           "Max. amount of data in the ring buffer (bytes, 0 = ring buffer disabled)",
384           0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
385           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386
387   /**
388    * GstURISourceBin::low-watermark
389    *
390    * Proportion of the queue size (either in bytes or time) for buffering
391    * to restart when crossed from above.  Only used if use-buffering is TRUE.
392    */
393   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
394       g_param_spec_double ("low-watermark", "Low watermark",
395           "Low threshold for buffering to start. Only used if use-buffering is True",
396           0.0, 1.0, DEFAULT_LOW_WATERMARK,
397           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399   /**
400    * GstURISourceBin::high-watermark
401    *
402    * Proportion of the queue size (either in bytes or time) to complete
403    * buffering.  Only used if use-buffering is TRUE.
404    */
405   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
406       g_param_spec_double ("high-watermark", "High watermark",
407           "High threshold for buffering to finish. Only used if use-buffering is True",
408           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
409           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410
411   /**
412    * GstURISourceBin::statistics
413    *
414    * A GStructure containing the following values based on the values from
415    * all the queue's contained in this urisourcebin.
416    *
417    *  "minimum-byte-level"  G_TYPE_UINT               Minimum of the current byte levels
418    *  "maximum-byte-level"  G_TYPE_UINT               Maximum of the current byte levels
419    *  "average-byte-level"  G_TYPE_UINT               Average of the current byte levels
420    *  "minimum-time-level"  G_TYPE_UINT64             Minimum of the current time levels
421    *  "maximum-time-level"  G_TYPE_UINT64             Maximum of the current time levels
422    *  "average-time-level"  G_TYPE_UINT64             Average of the current time levels
423    */
424   g_object_class_install_property (gobject_class, PROP_STATISTICS,
425       g_param_spec_boxed ("statistics", "Queue Statistics",
426           "A set of statistics over all the queue-like elements contained in "
427           "this element", GST_TYPE_STRUCTURE,
428           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
429
430   /**
431    * GstURISourceBin::drained:
432    *
433    * This signal is emitted when the data for the current uri is played.
434    */
435   gst_uri_source_bin_signals[SIGNAL_DRAINED] =
436       g_signal_new ("drained", G_TYPE_FROM_CLASS (klass),
437       G_SIGNAL_RUN_LAST,
438       G_STRUCT_OFFSET (GstURISourceBinClass, drained), NULL, NULL, NULL,
439       G_TYPE_NONE, 0, G_TYPE_NONE);
440
441     /**
442    * GstURISourceBin::about-to-finish:
443    *
444    * This signal is emitted when the data for the current uri is played.
445    */
446   gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH] =
447       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
448       G_SIGNAL_RUN_LAST,
449       G_STRUCT_OFFSET (GstURISourceBinClass, about_to_finish), NULL, NULL, NULL,
450       G_TYPE_NONE, 0, G_TYPE_NONE);
451
452   /**
453    * GstURISourceBin::source-setup:
454    * @bin: the urisourcebin.
455    * @source: source element
456    *
457    * This signal is emitted after the source element has been created, so
458    * it can be configured by setting additional properties (e.g. set a
459    * proxy server for an http source, or set the device and read speed for
460    * an audio cd source). This is functionally equivalent to connecting to
461    * the notify::source signal, but more convenient.
462    *
463    * Since: 1.6.1
464    */
465   gst_uri_source_bin_signals[SIGNAL_SOURCE_SETUP] =
466       g_signal_new ("source-setup", G_TYPE_FROM_CLASS (klass),
467       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
468
469   gst_element_class_add_pad_template (gstelement_class,
470       gst_static_pad_template_get (&srctemplate));
471   gst_element_class_set_static_metadata (gstelement_class,
472       "URI reader", "Generic/Bin/Source",
473       "Download and buffer a URI as needed",
474       "Jan Schmidt <jan@centricular.com>");
475
476   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_uri_source_bin_query);
477   gstelement_class->change_state =
478       GST_DEBUG_FUNCPTR (gst_uri_source_bin_change_state);
479
480   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (handle_message);
481 }
482
483 static void
484 gst_uri_source_bin_init (GstURISourceBin * urisrc)
485 {
486   g_mutex_init (&urisrc->lock);
487
488   g_mutex_init (&urisrc->buffering_lock);
489   g_mutex_init (&urisrc->buffering_post_lock);
490
491   urisrc->uri = g_strdup (DEFAULT_PROP_URI);
492   urisrc->connection_speed = DEFAULT_CONNECTION_SPEED;
493
494   urisrc->buffer_duration = DEFAULT_BUFFER_DURATION;
495   urisrc->buffer_size = DEFAULT_BUFFER_SIZE;
496   urisrc->download = DEFAULT_DOWNLOAD;
497   urisrc->use_buffering = DEFAULT_USE_BUFFERING;
498   urisrc->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
499   urisrc->last_buffering_pct = -1;
500   urisrc->low_watermark = DEFAULT_LOW_WATERMARK;
501   urisrc->high_watermark = DEFAULT_HIGH_WATERMARK;
502
503   urisrc->demuxer_handles_buffering = FALSE;
504
505   GST_OBJECT_FLAG_SET (urisrc,
506       GST_ELEMENT_FLAG_SOURCE | GST_BIN_FLAG_STREAMS_AWARE);
507   gst_bin_set_suppressed_flags (GST_BIN (urisrc),
508       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
509 }
510
511 static void
512 gst_uri_source_bin_finalize (GObject * obj)
513 {
514   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (obj);
515
516   remove_demuxer (urisrc);
517   g_mutex_clear (&urisrc->lock);
518   g_mutex_clear (&urisrc->buffering_lock);
519   g_mutex_clear (&urisrc->buffering_post_lock);
520   g_free (urisrc->uri);
521
522   G_OBJECT_CLASS (parent_class)->finalize (obj);
523 }
524
525 static void
526 gst_uri_source_bin_set_property (GObject * object, guint prop_id,
527     const GValue * value, GParamSpec * pspec)
528 {
529   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (object);
530
531   switch (prop_id) {
532     case PROP_URI:
533       GST_OBJECT_LOCK (urisrc);
534       g_free (urisrc->uri);
535       urisrc->uri = g_value_dup_string (value);
536       GST_OBJECT_UNLOCK (urisrc);
537       break;
538     case PROP_CONNECTION_SPEED:
539       GST_OBJECT_LOCK (urisrc);
540       urisrc->connection_speed = g_value_get_uint64 (value) * 1000;
541       GST_OBJECT_UNLOCK (urisrc);
542       break;
543     case PROP_BUFFER_SIZE:
544       urisrc->buffer_size = g_value_get_int (value);
545       update_queue_values (urisrc);
546       break;
547     case PROP_BUFFER_DURATION:
548       urisrc->buffer_duration = g_value_get_int64 (value);
549       update_queue_values (urisrc);
550       break;
551     case PROP_DOWNLOAD:
552       urisrc->download = g_value_get_boolean (value);
553       break;
554     case PROP_USE_BUFFERING:
555       urisrc->use_buffering = g_value_get_boolean (value);
556       break;
557     case PROP_RING_BUFFER_MAX_SIZE:
558       urisrc->ring_buffer_max_size = g_value_get_uint64 (value);
559       break;
560     case PROP_LOW_WATERMARK:
561       urisrc->low_watermark = g_value_get_double (value);
562       update_queue_values (urisrc);
563       break;
564     case PROP_HIGH_WATERMARK:
565       urisrc->high_watermark = g_value_get_double (value);
566       update_queue_values (urisrc);
567       break;
568     default:
569       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
570       break;
571   }
572 }
573
574 static void
575 gst_uri_source_bin_get_property (GObject * object, guint prop_id,
576     GValue * value, GParamSpec * pspec)
577 {
578   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (object);
579
580   switch (prop_id) {
581     case PROP_URI:
582       GST_OBJECT_LOCK (urisrc);
583       g_value_set_string (value, urisrc->uri);
584       GST_OBJECT_UNLOCK (urisrc);
585       break;
586     case PROP_SOURCE:
587       GST_OBJECT_LOCK (urisrc);
588       g_value_set_object (value, urisrc->source);
589       GST_OBJECT_UNLOCK (urisrc);
590       break;
591     case PROP_CONNECTION_SPEED:
592       GST_OBJECT_LOCK (urisrc);
593       g_value_set_uint64 (value, urisrc->connection_speed / 1000);
594       GST_OBJECT_UNLOCK (urisrc);
595       break;
596     case PROP_BUFFER_SIZE:
597       GST_OBJECT_LOCK (urisrc);
598       g_value_set_int (value, urisrc->buffer_size);
599       GST_OBJECT_UNLOCK (urisrc);
600       break;
601     case PROP_BUFFER_DURATION:
602       GST_OBJECT_LOCK (urisrc);
603       g_value_set_int64 (value, urisrc->buffer_duration);
604       GST_OBJECT_UNLOCK (urisrc);
605       break;
606     case PROP_DOWNLOAD:
607       g_value_set_boolean (value, urisrc->download);
608       break;
609     case PROP_USE_BUFFERING:
610       g_value_set_boolean (value, urisrc->use_buffering);
611       break;
612     case PROP_RING_BUFFER_MAX_SIZE:
613       g_value_set_uint64 (value, urisrc->ring_buffer_max_size);
614       break;
615     case PROP_LOW_WATERMARK:
616       g_value_set_double (value, urisrc->low_watermark);
617       break;
618     case PROP_HIGH_WATERMARK:
619       g_value_set_double (value, urisrc->high_watermark);
620       break;
621     case PROP_STATISTICS:
622       g_value_take_boxed (value, get_queue_statistics (urisrc));
623       break;
624     default:
625       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
626       break;
627   }
628 }
629
630 static gboolean
631 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
632 {
633   GstPad *gpad = GST_PAD_CAST (user_data);
634
635   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
636   gst_pad_store_sticky_event (gpad, *event);
637
638   return TRUE;
639 }
640
641 static GstPadProbeReturn
642 pending_pad_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data);
643
644 static GstPadProbeReturn
645 demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data);
646
647 static void
648 free_child_src_pad_info (ChildSrcPadInfo * info)
649 {
650   if (info->cur_caps)
651     gst_caps_unref (info->cur_caps);
652   if (info->output_pad)
653     gst_object_unref (info->output_pad);
654   g_free (info);
655 }
656
657 /* Called by the signal handlers when a demuxer has produced a new stream */
658 static void
659 new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
660     GstURISourceBin * urisrc)
661 {
662   ChildSrcPadInfo *info;
663
664   info = g_new0 (ChildSrcPadInfo, 1);
665   info->src_pad = pad;
666   info->cur_caps = gst_pad_get_current_caps (pad);
667   if (info->cur_caps == NULL)
668     info->cur_caps = gst_pad_query_caps (pad, NULL);
669
670   g_object_set_data_full (G_OBJECT (pad), "urisourcebin.srcpadinfo",
671       info, (GDestroyNotify) free_child_src_pad_info);
672
673   GST_URI_SOURCE_BIN_LOCK (urisrc);
674   /* If the demuxer handles buffering and is streams-aware, we can expose it
675      as-is directly. We still add an event probe to deal with EOS */
676   if (urisrc->demuxer_handles_buffering && urisrc->source_streams_aware) {
677     info->output_pad = gst_object_ref (create_output_pad (urisrc, pad));
678     GST_DEBUG_OBJECT (element,
679         "New streams-aware demuxer pad %s:%s , exposing directly",
680         GST_DEBUG_PAD_NAME (pad));
681     expose_output_pad (urisrc, info->output_pad);
682     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
683   } else {
684     GST_DEBUG_OBJECT (element, "new demuxer pad, name: <%s>. "
685         "Added as pending pad with caps %" GST_PTR_FORMAT,
686         GST_PAD_NAME (pad), info->cur_caps);
687
688     urisrc->pending_pads = g_list_prepend (urisrc->pending_pads, pad);
689     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
690
691     /* Block the pad. On the first data on that pad if it hasn't
692      * been linked to an output slot, we'll create one */
693     info->blocking_probe_id =
694         gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
695         pending_pad_blocked, urisrc, NULL);
696   }
697   info->event_probe_id =
698       gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
699       GST_PAD_PROBE_TYPE_EVENT_FLUSH, demux_pad_events, urisrc, NULL);
700 }
701
702 static GstPadProbeReturn
703 pending_pad_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
704 {
705   ChildSrcPadInfo *child_info;
706   OutputSlotInfo *slot;
707   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
708   GstCaps *caps;
709   GstPad *output_pad;
710
711   if (!(child_info =
712           g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
713     goto done;
714
715   GST_LOG_OBJECT (urisrc, "Removing pad %" GST_PTR_FORMAT " from pending list",
716       pad);
717
718   GST_URI_SOURCE_BIN_LOCK (urisrc);
719
720   /* Once blocked, this pad is no longer pending, one way or another */
721   urisrc->pending_pads = g_list_remove (urisrc->pending_pads, pad);
722
723   /* If already linked to a slot, nothing more to do */
724   if (child_info->output_slot) {
725     GST_LOG_OBJECT (urisrc, "Pad %" GST_PTR_FORMAT " is linked to queue %"
726         GST_PTR_FORMAT " on slot %p", pad, child_info->output_slot->queue,
727         child_info->output_slot);
728     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
729     goto done;
730   }
731
732   /* If the demuxer handles buffering, we can expose it as-is */
733   if (urisrc->demuxer_handles_buffering) {
734     g_assert (child_info->output_pad == NULL);
735     child_info->output_pad = gst_object_ref (create_output_pad (urisrc, pad));
736     GST_DEBUG_OBJECT (pad, "Demuxer handles buffering, exposing as-is");
737     expose_output_pad (urisrc, child_info->output_pad);
738     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
739     goto done;
740   }
741
742   caps = gst_pad_get_current_caps (pad);
743   if (caps == NULL)
744     caps = gst_pad_query_caps (pad, NULL);
745
746   slot = get_output_slot (urisrc, FALSE, TRUE, caps);
747
748   gst_caps_unref (caps);
749
750   if (slot == NULL) {
751     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
752     goto done;
753   }
754
755   GST_LOG_OBJECT (urisrc, "Pad %" GST_PTR_FORMAT " linked to slot %p", pad,
756       slot);
757
758   child_info->output_slot = slot;
759   slot->linked_info = child_info;
760   gst_pad_link (pad, slot->queue_sinkpad);
761
762   output_pad = gst_object_ref (slot->output_pad);
763
764   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
765
766   expose_output_pad (urisrc, output_pad);
767   gst_object_unref (output_pad);
768
769 done:
770   return GST_PAD_PROBE_REMOVE;
771 }
772
773 /* Called with LOCK held */
774 /* Looks for a suitable pending pad to connect onto this
775  * finishing output slot that's about to EOS */
776 static gboolean
777 link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot)
778 {
779   GList *cur;
780   ChildSrcPadInfo *in_info = slot->linked_info;
781   ChildSrcPadInfo *out_info = NULL;
782   gboolean res = FALSE;
783   GstCaps *cur_caps;
784
785   /* Look for a suitable pending pad */
786   cur_caps = gst_pad_get_current_caps (slot->queue_sinkpad);
787
788   GST_DEBUG_OBJECT (urisrc,
789       "Looking for a pending pad with caps %" GST_PTR_FORMAT, cur_caps);
790
791   for (cur = urisrc->pending_pads; cur != NULL; cur = g_list_next (cur)) {
792     GstPad *pending = (GstPad *) (cur->data);
793     ChildSrcPadInfo *cur_info = NULL;
794     if ((cur_info =
795             g_object_get_data (G_OBJECT (pending),
796                 "urisourcebin.srcpadinfo"))) {
797       /* Don't re-link to the same pad in case of EOS while still pending */
798       if (in_info == cur_info)
799         continue;
800       if (cur_caps == NULL || gst_caps_is_equal (cur_caps, cur_info->cur_caps)) {
801         GST_DEBUG_OBJECT (urisrc, "Found suitable pending pad %" GST_PTR_FORMAT
802             " with caps %" GST_PTR_FORMAT " to link to this output slot",
803             cur_info->src_pad, cur_info->cur_caps);
804         out_info = cur_info;
805         break;
806       }
807     }
808   }
809
810   if (cur_caps)
811     gst_caps_unref (cur_caps);
812
813   if (out_info) {
814     /* Block any upstream stuff while we switch out the pad */
815     guint block_id = gst_pad_add_probe (slot->queue_sinkpad,
816         GST_PAD_PROBE_TYPE_BLOCK_UPSTREAM,
817         NULL, NULL, NULL);
818     GST_DEBUG_OBJECT (urisrc, "Linking pending pad %" GST_PTR_FORMAT
819         " to existing output slot %p", out_info->src_pad, slot);
820
821     if (in_info) {
822       gst_pad_unlink (in_info->src_pad, slot->queue_sinkpad);
823       in_info->output_slot = NULL;
824       slot->linked_info = NULL;
825     }
826
827     if (gst_pad_link (out_info->src_pad,
828             slot->queue_sinkpad) == GST_PAD_LINK_OK) {
829       out_info->output_slot = slot;
830       slot->linked_info = out_info;
831
832       BUFFERING_LOCK (urisrc);
833       /* A re-linked slot is no longer EOS */
834       slot->is_eos = FALSE;
835       BUFFERING_UNLOCK (urisrc);
836       res = TRUE;
837       slot->is_eos = FALSE;
838       urisrc->pending_pads =
839           g_list_remove (urisrc->pending_pads, out_info->src_pad);
840     } else {
841       GST_ERROR_OBJECT (urisrc,
842           "Failed to link new demuxer pad to the output slot we tried");
843     }
844     gst_pad_remove_probe (slot->queue_sinkpad, block_id);
845   }
846
847   return res;
848 }
849
850 /* Called with lock held */
851 static gboolean
852 all_slots_are_eos (GstURISourceBin * urisrc)
853 {
854   GSList *tmp;
855
856   for (tmp = urisrc->out_slots; tmp; tmp = tmp->next) {
857     OutputSlotInfo *slot = (OutputSlotInfo *) tmp->data;
858     if (slot->is_eos == FALSE)
859       return FALSE;
860   }
861   return TRUE;
862 }
863
864 static GstPadProbeReturn
865 demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
866 {
867   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
868   ChildSrcPadInfo *child_info;
869   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
870   GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
871
872   if (!(child_info =
873           g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
874     goto done;
875
876   GST_URI_SOURCE_BIN_LOCK (urisrc);
877   /* If not linked to a slot, nothing more to do */
878   if (child_info->output_slot == NULL) {
879     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
880     goto done;
881   }
882
883   switch (GST_EVENT_TYPE (ev)) {
884     case GST_EVENT_EOS:
885     {
886       gboolean all_streams_eos;
887
888       GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
889
890       if ((urisrc->pending_pads &&
891               link_pending_pad_to_output (urisrc, child_info->output_slot))) {
892         /* Found a new source pad to give this slot data - no need to send EOS */
893         GST_URI_SOURCE_BIN_UNLOCK (urisrc);
894         ret = GST_PAD_PROBE_DROP;
895         goto done;
896       }
897
898       BUFFERING_LOCK (urisrc);
899       /* Mark that we fed an EOS to this slot */
900       child_info->output_slot->is_eos = TRUE;
901       all_streams_eos = all_slots_are_eos (urisrc);
902       BUFFERING_UNLOCK (urisrc);
903
904       /* EOS means this element is no longer buffering */
905       remove_buffering_msgs (urisrc,
906           GST_OBJECT_CAST (child_info->output_slot->queue));
907
908       /* Mark this custom EOS, replacing the event in the probe data */
909       ev = gst_event_make_writable (ev);
910       GST_PAD_PROBE_INFO_DATA (info) = ev;
911
912       gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev), CUSTOM_EOS_QUARK,
913           (gchar *) CUSTOM_EOS_QUARK_DATA, NULL);
914
915       if (all_streams_eos) {
916         GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH");
917         g_signal_emit (urisrc,
918             gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH], 0, NULL);
919       }
920     }
921       break;
922     case GST_EVENT_CAPS:
923     {
924       GstCaps *caps;
925       gst_event_parse_caps (ev, &caps);
926       gst_caps_replace (&child_info->cur_caps, caps);
927     }
928       break;
929     case GST_EVENT_STREAM_START:
930     case GST_EVENT_FLUSH_STOP:
931       BUFFERING_LOCK (urisrc);
932       child_info->output_slot->is_eos = FALSE;
933       BUFFERING_UNLOCK (urisrc);
934       break;
935     default:
936       break;
937   }
938
939   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
940
941 done:
942   return ret;
943 }
944
945 static GstPadProbeReturn
946 pre_queue_event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
947 {
948   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
949   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
950   GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
951
952   switch (GST_EVENT_TYPE (ev)) {
953     case GST_EVENT_EOS:
954     {
955       GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
956       GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH");
957       g_signal_emit (urisrc,
958           gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH], 0, NULL);
959     }
960       break;
961     default:
962       break;
963   }
964   return ret;
965 }
966
967 static GstStructure *
968 get_queue_statistics (GstURISourceBin * urisrc)
969 {
970   GstStructure *ret = NULL;
971   guint min_byte_level = 0, max_byte_level = 0;
972   guint64 min_time_level = 0, max_time_level = 0;
973   gdouble avg_byte_level = 0., avg_time_level = 0.;
974   guint i = 0;
975   GSList *cur;
976
977   GST_URI_SOURCE_BIN_LOCK (urisrc);
978
979   for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
980     OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
981     guint byte_limit = 0;
982     guint64 time_limit = 0;
983
984     g_object_get (slot->queue, "current-level-bytes", &byte_limit,
985         "current-level-time", &time_limit, NULL);
986
987     if (byte_limit < min_byte_level)
988       min_byte_level = byte_limit;
989     if (byte_limit > max_byte_level)
990       max_byte_level = byte_limit;
991     avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1);
992
993     if (time_limit < min_time_level)
994       min_time_level = time_limit;
995     if (time_limit > max_time_level)
996       max_time_level = time_limit;
997     avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1);
998
999     i++;
1000   }
1001   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1002
1003   ret = gst_structure_new ("application/x-urisourcebin-stats",
1004       "minimum-byte-level", G_TYPE_UINT, (guint) min_byte_level,
1005       "maximum-byte-level", G_TYPE_UINT, (guint) max_byte_level,
1006       "average-byte-level", G_TYPE_UINT, (guint) avg_byte_level,
1007       "minimum-time-level", G_TYPE_UINT64, (guint64) min_time_level,
1008       "maximum-time-level", G_TYPE_UINT64, (guint64) max_time_level,
1009       "average-time-level", G_TYPE_UINT64, (guint64) avg_time_level, NULL);
1010
1011   return ret;
1012 }
1013
1014 static void
1015 update_queue_values (GstURISourceBin * urisrc)
1016 {
1017   gint64 duration;
1018   guint buffer_size;
1019   gdouble low_watermark, high_watermark;
1020   guint64 cumulative_bitrate = 0;
1021   GSList *cur;
1022
1023   GST_URI_SOURCE_BIN_LOCK (urisrc);
1024   duration = GET_BUFFER_DURATION (urisrc);
1025   buffer_size = GET_BUFFER_SIZE (urisrc);
1026   low_watermark = urisrc->low_watermark;
1027   high_watermark = urisrc->high_watermark;
1028
1029   for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
1030     OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
1031     guint64 bitrate = 0;
1032
1033     if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
1034             "bitrate")) {
1035       g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
1036     }
1037
1038     if (bitrate > 0)
1039       cumulative_bitrate += bitrate;
1040     else {
1041       GST_TRACE_OBJECT (urisrc, "Unknown bitrate detected from %" GST_PTR_FORMAT
1042           ", resetting all bitrates", slot->queue);
1043       cumulative_bitrate = 0;
1044       break;
1045     }
1046   }
1047
1048   GST_DEBUG_OBJECT (urisrc, "recalculating queue limits with cumulative "
1049       "bitrate %" G_GUINT64_FORMAT ", buffer size %u, buffer duration %"
1050       G_GINT64_FORMAT, cumulative_bitrate, buffer_size, duration);
1051
1052   for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
1053     OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
1054     guint byte_limit;
1055
1056     if (cumulative_bitrate > 0
1057         && g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
1058             "bitrate")) {
1059       guint64 bitrate;
1060       g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
1061       byte_limit =
1062           gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate);
1063     } else {
1064       /* if not all queue's have valid bitrates, use the buffer-size as the
1065        * limit */
1066       byte_limit = buffer_size;
1067     }
1068
1069     GST_DEBUG_OBJECT (urisrc,
1070         "calculated new limits for queue-like element %" GST_PTR_FORMAT
1071         ", bytes:%u, time:%" G_GUINT64_FORMAT
1072         ", low-watermark:%f, high-watermark:%f",
1073         slot->queue, byte_limit, (guint64) duration, low_watermark,
1074         high_watermark);
1075     g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit,
1076         "max-size-time", (guint64) duration, "low-watermark", low_watermark,
1077         "high-watermark", high_watermark, NULL);
1078   }
1079   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1080 }
1081
1082 static void
1083 on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec,
1084     gpointer user_data)
1085 {
1086   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
1087
1088   gst_element_call_async (GST_ELEMENT (urisrc),
1089       (GstElementCallAsyncFunc) update_queue_values, NULL, NULL);
1090 }
1091
1092 /* Called with lock held */
1093 static OutputSlotInfo *
1094 get_output_slot (GstURISourceBin * urisrc, gboolean do_download,
1095     gboolean is_adaptive, GstCaps * caps)
1096 {
1097   OutputSlotInfo *slot;
1098   GstPad *srcpad;
1099   GstElement *queue;
1100   const gchar *elem_name;
1101
1102   /* If we have caps, iterate the existing slots and look for an
1103    * unlinked one that can be used */
1104   if (caps && gst_caps_is_fixed (caps)) {
1105     GSList *cur;
1106     GstCaps *cur_caps;
1107
1108     for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
1109       slot = (OutputSlotInfo *) (cur->data);
1110       if (slot->linked_info == NULL) {
1111         cur_caps = gst_pad_get_current_caps (slot->queue_sinkpad);
1112         if (cur_caps == NULL || gst_caps_is_equal (caps, cur_caps)) {
1113           GST_LOG_OBJECT (urisrc, "Found existing slot %p to link to", slot);
1114           gst_caps_unref (cur_caps);
1115           slot->is_eos = FALSE;
1116           return slot;
1117         }
1118         gst_caps_unref (cur_caps);
1119       }
1120     }
1121   }
1122
1123   /* Otherwise create the new slot */
1124   if (do_download)
1125     elem_name = "downloadbuffer";
1126   else
1127     elem_name = "queue2";
1128
1129   queue = gst_element_factory_make (elem_name, NULL);
1130   if (!queue)
1131     goto no_buffer_element;
1132
1133   slot = g_new0 (OutputSlotInfo, 1);
1134   slot->queue = queue;
1135
1136   /* Set the slot onto the queue (needed in buffering msg handling) */
1137   g_object_set_data (G_OBJECT (queue), "urisourcebin.slotinfo", slot);
1138
1139   slot->bitrate_changed_id =
1140       g_signal_connect (G_OBJECT (queue), "notify::bitrate",
1141       (GCallback) on_queue_bitrate_changed, urisrc);
1142
1143   if (do_download) {
1144     gchar *temp_template, *filename;
1145     const gchar *tmp_dir, *prgname;
1146
1147     tmp_dir = g_get_user_cache_dir ();
1148     prgname = g_get_prgname ();
1149     if (prgname == NULL)
1150       prgname = "GStreamer";
1151
1152     filename = g_strdup_printf ("%s-XXXXXX", prgname);
1153
1154     /* build our filename */
1155     temp_template = g_build_filename (tmp_dir, filename, NULL);
1156
1157     GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
1158         temp_template, tmp_dir, prgname, filename);
1159
1160     /* configure progressive download for selected media types */
1161     g_object_set (queue, "temp-template", temp_template, NULL);
1162
1163     g_free (filename);
1164     g_free (temp_template);
1165   } else {
1166     if (is_adaptive) {
1167       GST_LOG_OBJECT (urisrc, "Adding queue for adaptive streaming stream");
1168       g_object_set (queue, "use-buffering", urisrc->use_buffering,
1169           "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL);
1170     } else {
1171       GST_LOG_OBJECT (urisrc, "Adding queue for buffering");
1172       g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
1173     }
1174
1175     g_object_set (queue, "ring-buffer-max-size",
1176         urisrc->ring_buffer_max_size, NULL);
1177     /* Disable max-size-buffers - queue based on data rate to the default time limit */
1178     g_object_set (queue, "max-size-buffers", 0, NULL);
1179
1180     /* Don't start buffering until the queue is empty (< 1%).
1181      * Start playback when the queue is 60% full, leaving a bit more room
1182      * for upstream to push more without getting bursty */
1183     g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL);
1184
1185     g_object_set (queue, "low-watermark", urisrc->low_watermark,
1186         "high-watermark", urisrc->high_watermark, NULL);
1187   }
1188
1189   /* set the necessary limits on the queue-like elements */
1190   g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc),
1191       "max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL);
1192 #if 0
1193   /* Disabled because this makes initial startup slower for radio streams */
1194   else {
1195     /* Buffer 4 seconds by default - some extra headroom over the
1196      * core default, because we trigger playback sooner */
1197     //g_object_set (queue, "max-size-time", 4 * GST_SECOND, NULL);
1198   }
1199 #endif
1200
1201   /* save queue pointer so we can remove it later */
1202   urisrc->out_slots = g_slist_prepend (urisrc->out_slots, slot);
1203
1204   gst_bin_add (GST_BIN_CAST (urisrc), queue);
1205   gst_element_sync_state_with_parent (queue);
1206
1207   slot->queue_sinkpad = gst_element_get_static_pad (queue, "sink");
1208
1209   /* get the new raw srcpad */
1210   srcpad = gst_element_get_static_pad (queue, "src");
1211   g_object_set_data (G_OBJECT (srcpad), "urisourcebin.slotinfo", slot);
1212
1213   slot->output_pad = create_output_pad (urisrc, srcpad);
1214
1215   gst_object_unref (srcpad);
1216
1217   return slot;
1218
1219 no_buffer_element:
1220   {
1221     post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), elem_name);
1222     return NULL;
1223   }
1224 }
1225
1226 static GstPadProbeReturn
1227 source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
1228     gpointer user_data)
1229 {
1230   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
1231   GstURISourceBin *urisrc = user_data;
1232
1233   GST_LOG_OBJECT (pad, "%s, urisrc %p", GST_EVENT_TYPE_NAME (event), event);
1234
1235   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
1236       gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (event),
1237           CUSTOM_EOS_QUARK)) {
1238     OutputSlotInfo *slot;
1239     GST_DEBUG_OBJECT (pad, "we received EOS");
1240
1241     /* remove custom-eos */
1242     gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event), CUSTOM_EOS_QUARK,
1243         NULL, NULL);
1244
1245     GST_URI_SOURCE_BIN_LOCK (urisrc);
1246
1247     slot = g_object_get_data (G_OBJECT (pad), "urisourcebin.slotinfo");
1248
1249     if (slot) {
1250       GstEvent *eos;
1251       guint32 seqnum;
1252
1253       if (slot->linked_info) {
1254         if (slot->is_eos) {
1255           /* linked_info is old input which is still linked without removal */
1256           GST_DEBUG_OBJECT (pad, "push actual EOS");
1257           seqnum = gst_event_get_seqnum (event);
1258           eos = gst_event_new_eos ();
1259           gst_event_set_seqnum (eos, seqnum);
1260           gst_pad_push_event (slot->output_pad, eos);
1261         } else {
1262           /* Do not clear output slot yet. A new input was
1263            * connected. We should just drop this EOS */
1264         }
1265         GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1266         return GST_PAD_PROBE_DROP;
1267       }
1268
1269       seqnum = gst_event_get_seqnum (event);
1270       eos = gst_event_new_eos ();
1271       gst_event_set_seqnum (eos, seqnum);
1272       gst_pad_push_event (slot->output_pad, eos);
1273       free_output_slot_async (urisrc, slot);
1274     }
1275
1276     /* FIXME: Only emit drained if all output pads are done and there's no
1277      * pending pads */
1278     g_signal_emit (urisrc, gst_uri_source_bin_signals[SIGNAL_DRAINED], 0, NULL);
1279
1280     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1281     return GST_PAD_PROBE_DROP;
1282   }
1283   /* never drop events */
1284   return GST_PAD_PROBE_OK;
1285 }
1286
1287 /* called when we found a raw pad to expose. We set up a
1288  * padprobe to detect EOS before exposing the pad.
1289  * Called with LOCK held. */
1290 static GstPad *
1291 create_output_pad (GstURISourceBin * urisrc, GstPad * pad)
1292 {
1293   GstPad *newpad;
1294   GstPadTemplate *pad_tmpl;
1295   gchar *padname;
1296
1297   gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
1298       source_pad_event_probe, urisrc, NULL);
1299
1300   pad_tmpl = gst_static_pad_template_get (&srctemplate);
1301
1302   padname = g_strdup_printf ("src_%u", urisrc->numpads);
1303   urisrc->numpads++;
1304
1305   newpad = gst_ghost_pad_new_from_template (padname, pad, pad_tmpl);
1306   gst_object_unref (pad_tmpl);
1307   g_free (padname);
1308
1309   GST_DEBUG_OBJECT (urisrc, "Created output pad %s:%s for pad %s:%s",
1310       GST_DEBUG_PAD_NAME (newpad), GST_DEBUG_PAD_NAME (pad));
1311
1312   return newpad;
1313 }
1314
1315 static void
1316 expose_output_pad (GstURISourceBin * urisrc, GstPad * pad)
1317 {
1318   GstPad *target;
1319
1320   if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (urisrc)))
1321     return;                     /* Pad is already exposed */
1322
1323   target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1324
1325   gst_pad_sticky_events_foreach (target, copy_sticky_events, pad);
1326   gst_object_unref (target);
1327
1328   GST_DEBUG_OBJECT (urisrc, "Exposing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1329
1330   gst_pad_set_active (pad, TRUE);
1331   gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
1332 }
1333
1334 static void
1335 expose_raw_output_pad (GstURISourceBin * urisrc, GstPad * srcpad,
1336     GstPad * output_pad)
1337 {
1338   ChildSrcPadInfo *info = g_new0 (ChildSrcPadInfo, 1);
1339   info->src_pad = srcpad;
1340   info->output_pad = gst_object_ref (output_pad);
1341
1342   g_assert (g_object_get_data (G_OBJECT (srcpad),
1343           "urisourcebin.srcpadinfo") == NULL);
1344
1345   g_object_set_data_full (G_OBJECT (srcpad), "urisourcebin.srcpadinfo",
1346       info, (GDestroyNotify) free_child_src_pad_info);
1347
1348   expose_output_pad (urisrc, output_pad);
1349 }
1350
1351 static void
1352 remove_output_pad (GstURISourceBin * urisrc, GstPad * pad)
1353 {
1354   if (!gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (urisrc)))
1355     return;                     /* Pad is not exposed */
1356
1357   GST_DEBUG_OBJECT (urisrc, "Removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1358
1359   gst_pad_set_active (pad, FALSE);
1360   gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), pad);
1361 }
1362
1363 static void
1364 pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
1365 {
1366   ChildSrcPadInfo *info;
1367
1368   GST_DEBUG_OBJECT (element, "pad removed name: <%s:%s>",
1369       GST_DEBUG_PAD_NAME (pad));
1370
1371   /* we only care about srcpads */
1372   if (!GST_PAD_IS_SRC (pad))
1373     return;
1374
1375   if (!(info = g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
1376     goto no_info;
1377
1378   GST_URI_SOURCE_BIN_LOCK (urisrc);
1379   /* Make sure this isn't in the pending pads list */
1380   urisrc->pending_pads = g_list_remove (urisrc->pending_pads, pad);
1381
1382   /* Send EOS to the output slot if the demuxer didn't already */
1383   if (info->output_slot) {
1384     GstStructure *s;
1385     GstEvent *event;
1386     OutputSlotInfo *slot;
1387
1388     slot = info->output_slot;
1389
1390     if (!slot->is_eos && urisrc->pending_pads &&
1391         link_pending_pad_to_output (urisrc, slot)) {
1392       /* Found a new source pad to give this slot data - no need to send EOS */
1393       GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1394       return;
1395     }
1396
1397     BUFFERING_LOCK (urisrc);
1398     /* Unlink this pad from its output slot and send a fake EOS event
1399      * to drain the queue */
1400     slot->is_eos = TRUE;
1401     BUFFERING_UNLOCK (urisrc);
1402
1403     remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
1404
1405     slot->linked_info = NULL;
1406
1407     info->output_slot = NULL;
1408
1409     GST_LOG_OBJECT (element,
1410         "Pad %" GST_PTR_FORMAT " was removed without EOS. Sending.", pad);
1411
1412     event = gst_event_new_eos ();
1413     s = gst_event_writable_structure (event);
1414     gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE,
1415         NULL);
1416     gst_pad_send_event (slot->queue_sinkpad, event);
1417   } else if (info->output_pad != NULL) {
1418     GST_LOG_OBJECT (element,
1419         "Pad %" GST_PTR_FORMAT " was removed. Unexposing %" GST_PTR_FORMAT,
1420         pad, info->output_pad);
1421     remove_output_pad (urisrc, info->output_pad);
1422   } else {
1423     GST_WARNING_OBJECT (urisrc, "Removed pad has no output slot or pad");
1424   }
1425   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1426
1427   return;
1428
1429   /* ERRORS */
1430 no_info:
1431   {
1432     GST_WARNING_OBJECT (element, "no info found for pad");
1433     return;
1434   }
1435 }
1436
1437 /* helper function to lookup stuff in lists */
1438 static gboolean
1439 array_has_value (const gchar * values[], const gchar * value)
1440 {
1441   gint i;
1442
1443   for (i = 0; values[i]; i++) {
1444     if (g_str_has_prefix (value, values[i]))
1445       return TRUE;
1446   }
1447   return FALSE;
1448 }
1449
1450 static gboolean
1451 array_has_uri_value (const gchar * values[], const gchar * value)
1452 {
1453   gint i;
1454
1455   for (i = 0; values[i]; i++) {
1456     if (!g_ascii_strncasecmp (value, values[i], strlen (values[i])))
1457       return TRUE;
1458   }
1459   return FALSE;
1460 }
1461
1462 /* list of URIs that we consider to be streams and that need buffering.
1463  * We have no mechanism yet to figure this out with a query. */
1464 static const gchar *stream_uris[] = { "http://", "https://", "mms://",
1465   "mmsh://", "mmsu://", "mmst://", "fd://", "myth://", "ssh://",
1466   "ftp://", "sftp://",
1467   NULL
1468 };
1469
1470 /* list of URIs that need a queue because they are pretty bursty */
1471 static const gchar *queue_uris[] = { "cdda://", NULL };
1472
1473 /* blacklisted URIs, we know they will always fail. */
1474 static const gchar *blacklisted_uris[] = { NULL };
1475
1476 /* media types that use adaptive streaming */
1477 static const gchar *adaptive_media[] = {
1478   "application/x-hls", "application/vnd.ms-sstr+xml",
1479   "application/dash+xml", NULL
1480 };
1481
1482 #define IS_STREAM_URI(uri)          (array_has_uri_value (stream_uris, uri))
1483 #define IS_QUEUE_URI(uri)           (array_has_uri_value (queue_uris, uri))
1484 #define IS_BLACKLISTED_URI(uri)     (array_has_uri_value (blacklisted_uris, uri))
1485 #define IS_ADAPTIVE_MEDIA(media)    (array_has_value (adaptive_media, media))
1486
1487 /*
1488  * Generate and configure a source element.
1489  */
1490 static GstElement *
1491 gen_source_element (GstURISourceBin * urisrc)
1492 {
1493   GObjectClass *source_class;
1494   GstElement *source;
1495   GParamSpec *pspec;
1496   GstQuery *query;
1497   GstSchedulingFlags flags;
1498   GError *err = NULL;
1499
1500   if (!urisrc->uri)
1501     goto no_uri;
1502
1503   GST_LOG_OBJECT (urisrc, "finding source for %s", urisrc->uri);
1504
1505   if (!gst_uri_is_valid (urisrc->uri))
1506     goto invalid_uri;
1507
1508   if (IS_BLACKLISTED_URI (urisrc->uri))
1509     goto uri_blacklisted;
1510
1511   source = gst_element_make_from_uri (GST_URI_SRC, urisrc->uri, NULL, &err);
1512   if (!source)
1513     goto no_source;
1514
1515   GST_LOG_OBJECT (urisrc, "found source type %s", G_OBJECT_TYPE_NAME (source));
1516
1517   urisrc->is_stream = IS_STREAM_URI (urisrc->uri);
1518
1519   query = gst_query_new_scheduling ();
1520   if (gst_element_query (source, query)) {
1521     gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
1522     if ((flags & GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED))
1523       urisrc->is_stream = TRUE;
1524   }
1525   gst_query_unref (query);
1526
1527   source_class = G_OBJECT_GET_CLASS (source);
1528
1529   if (urisrc->is_stream) {
1530     /* Live sources are not streamable */
1531     pspec = g_object_class_find_property (source_class, "is-live");
1532     if (pspec && G_PARAM_SPEC_VALUE_TYPE (pspec) == G_TYPE_BOOLEAN) {
1533       gboolean is_live;
1534       g_object_get (G_OBJECT (source), "is-live", &is_live, NULL);
1535       if (is_live)
1536         urisrc->is_stream = FALSE;
1537     }
1538   }
1539
1540   GST_LOG_OBJECT (urisrc, "source is stream: %d", urisrc->is_stream);
1541
1542   pspec = g_object_class_find_property (source_class, "connection-speed");
1543   if (pspec != NULL) {
1544     guint64 speed = urisrc->connection_speed / 1000;
1545     gboolean wrong_type = FALSE;
1546
1547     if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_UINT) {
1548       GParamSpecUInt *pspecuint = G_PARAM_SPEC_UINT (pspec);
1549
1550       speed = CLAMP (speed, pspecuint->minimum, pspecuint->maximum);
1551     } else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_INT) {
1552       GParamSpecInt *pspecint = G_PARAM_SPEC_INT (pspec);
1553
1554       speed = CLAMP (speed, pspecint->minimum, pspecint->maximum);
1555     } else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_UINT64) {
1556       GParamSpecUInt64 *pspecuint = G_PARAM_SPEC_UINT64 (pspec);
1557
1558       speed = CLAMP (speed, pspecuint->minimum, pspecuint->maximum);
1559     } else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_INT64) {
1560       GParamSpecInt64 *pspecint = G_PARAM_SPEC_INT64 (pspec);
1561
1562       speed = CLAMP (speed, pspecint->minimum, pspecint->maximum);
1563     } else {
1564       GST_WARNING_OBJECT (urisrc,
1565           "The connection speed property %" G_GUINT64_FORMAT
1566           " of type %s is not useful. Not setting it", speed,
1567           g_type_name (G_PARAM_SPEC_TYPE (pspec)));
1568       wrong_type = TRUE;
1569     }
1570
1571     if (!wrong_type) {
1572       g_object_set (source, "connection-speed", speed, NULL);
1573
1574       GST_DEBUG_OBJECT (urisrc,
1575           "setting connection-speed=%" G_GUINT64_FORMAT " to source element",
1576           speed);
1577     }
1578   }
1579
1580   return source;
1581
1582   /* ERRORS */
1583 no_uri:
1584   {
1585     GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
1586         (_("No URI specified to play from.")), (NULL));
1587     return NULL;
1588   }
1589 invalid_uri:
1590   {
1591     GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
1592         (_("Invalid URI \"%s\"."), urisrc->uri), (NULL));
1593     g_clear_error (&err);
1594     return NULL;
1595   }
1596 uri_blacklisted:
1597   {
1598     GST_ELEMENT_ERROR (urisrc, RESOURCE, FAILED,
1599         (_("This stream type cannot be played yet.")), (NULL));
1600     return NULL;
1601   }
1602 no_source:
1603   {
1604     /* whoops, could not create the source element, dig a little deeper to
1605      * figure out what might be wrong. */
1606     if (err != NULL && err->code == GST_URI_ERROR_UNSUPPORTED_PROTOCOL) {
1607       gchar *prot;
1608
1609       prot = gst_uri_get_protocol (urisrc->uri);
1610       if (prot == NULL)
1611         goto invalid_uri;
1612
1613       gst_element_post_message (GST_ELEMENT_CAST (urisrc),
1614           gst_missing_uri_source_message_new (GST_ELEMENT (urisrc), prot));
1615
1616       GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN,
1617           (_("No URI handler implemented for \"%s\"."), prot), (NULL));
1618
1619       g_free (prot);
1620     } else {
1621       GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
1622           ("%s", (err) ? err->message : "URI was not accepted by any element"),
1623           ("No element accepted URI '%s'", urisrc->uri));
1624     }
1625
1626     g_clear_error (&err);
1627     return NULL;
1628   }
1629 }
1630
1631 static gboolean
1632 is_all_raw_caps (GstCaps * caps, GstCaps * rawcaps, gboolean * all_raw)
1633 {
1634   GstCaps *intersection;
1635   gint capssize;
1636   gboolean res = FALSE;
1637
1638   if (caps == NULL)
1639     return FALSE;
1640
1641   capssize = gst_caps_get_size (caps);
1642   /* no caps, skip and move to the next pad */
1643   if (capssize == 0 || gst_caps_is_empty (caps) || gst_caps_is_any (caps))
1644     goto done;
1645
1646   intersection = gst_caps_intersect (caps, rawcaps);
1647   *all_raw = !gst_caps_is_empty (intersection)
1648       && (gst_caps_get_size (intersection) == capssize);
1649   gst_caps_unref (intersection);
1650
1651   res = TRUE;
1652
1653 done:
1654   return res;
1655 }
1656
1657 /**
1658  * has_all_raw_caps:
1659  * @pad: a #GstPad
1660  * @all_raw: pointer to hold the result
1661  *
1662  * check if the caps of the pad are all raw. The caps are all raw if
1663  * all of its structures contain audio/x-raw or video/x-raw.
1664  *
1665  * Returns: %FALSE @pad has no caps. Else TRUE and @all_raw set t the result.
1666  */
1667 static gboolean
1668 has_all_raw_caps (GstPad * pad, GstCaps * rawcaps, gboolean * all_raw)
1669 {
1670   GstCaps *caps;
1671   gboolean res = FALSE;
1672
1673   caps = gst_pad_query_caps (pad, NULL);
1674
1675   GST_DEBUG_OBJECT (pad, "have caps %" GST_PTR_FORMAT, caps);
1676
1677   res = is_all_raw_caps (caps, rawcaps, all_raw);
1678
1679   gst_caps_unref (caps);
1680   return res;
1681 }
1682
1683 static void
1684 post_missing_plugin_error (GstElement * urisrc, const gchar * element_name)
1685 {
1686   GstMessage *msg;
1687
1688   msg = gst_missing_element_message_new (urisrc, element_name);
1689   gst_element_post_message (urisrc, msg);
1690
1691   GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN,
1692       (_("Missing element '%s' - check your GStreamer installation."),
1693           element_name), (NULL));
1694 }
1695
1696 /**
1697  * analyse_source_and_expose_raw_pads:
1698  * @urisrc: a #GstURISourceBin
1699  * @all_pads_raw: are all pads raw data
1700  * @have_out: does the source have output
1701  * @is_dynamic: is this a dynamic source
1702  *
1703  * Check the source of @urisrc and collect information about it. Any pad
1704  * exposing raw data will be exposed directly.
1705  *
1706  * @is_raw will be set to TRUE if the source only produces raw pads. When this
1707  * function returns, all of the raw pad of the source will be added
1708  * to @urisrc
1709  *
1710  * @have_out: will be set to TRUE if the source has output pads.
1711  *
1712  * @is_dynamic: TRUE if the element will create (more) pads dynamically later
1713  * on.
1714  *
1715  * Returns: FALSE if a fatal error occurred while scanning.
1716  */
1717 static gboolean
1718 analyse_source_and_expose_raw_pads (GstURISourceBin * urisrc,
1719     gboolean * all_pads_raw, gboolean * have_out, gboolean * is_dynamic)
1720 {
1721   GstElementClass *elemclass;
1722   GList *walk;
1723   GstIterator *pads_iter;
1724   gboolean done = FALSE;
1725   gboolean res = TRUE;
1726   GstPad *pad;
1727   GValue item = { 0, };
1728   guint nb_raw = 0;
1729   guint nb_pads = 0;
1730   GstCaps *rawcaps = DEFAULT_CAPS;
1731   gboolean pad_is_raw;
1732   gboolean use_queue;
1733
1734   *have_out = FALSE;
1735   *all_pads_raw = FALSE;
1736   *is_dynamic = FALSE;
1737
1738   /* Add buffering elements on raw pads only is very specific conditions */
1739   use_queue = urisrc->use_buffering && IS_QUEUE_URI (urisrc->uri);
1740
1741   pads_iter = gst_element_iterate_src_pads (urisrc->source);
1742   while (!done) {
1743     switch (gst_iterator_next (pads_iter, &item)) {
1744       case GST_ITERATOR_ERROR:
1745         res = FALSE;
1746         /* FALLTHROUGH */
1747       case GST_ITERATOR_DONE:
1748         done = TRUE;
1749         break;
1750       case GST_ITERATOR_RESYNC:
1751         /* reset results and resync */
1752         *have_out = FALSE;
1753         *all_pads_raw = FALSE;
1754         *is_dynamic = FALSE;
1755         nb_pads = nb_raw = 0;
1756         gst_iterator_resync (pads_iter);
1757         break;
1758       case GST_ITERATOR_OK:
1759         pad = g_value_dup_object (&item);
1760         /* we now officially have an output pad */
1761         *have_out = TRUE;
1762
1763         /* if FALSE, this pad has no caps and we continue with the next pad. */
1764         if (!has_all_raw_caps (pad, rawcaps, &pad_is_raw)) {
1765           gst_object_unref (pad);
1766           g_value_reset (&item);
1767           break;
1768         }
1769
1770         nb_pads++;
1771         /* caps on source pad are all raw, we can add the pad */
1772         if (pad_is_raw) {
1773           GstPad *output_pad;
1774
1775           nb_raw++;
1776           GST_URI_SOURCE_BIN_LOCK (urisrc);
1777           if (use_queue) {
1778             OutputSlotInfo *slot = get_output_slot (urisrc, FALSE, FALSE, NULL);
1779             if (!slot)
1780               goto no_slot;
1781
1782             gst_pad_link (pad, slot->queue_sinkpad);
1783
1784             /* get the new raw srcpad */
1785             output_pad = gst_object_ref (slot->output_pad);
1786
1787             GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1788
1789             expose_output_pad (urisrc, output_pad);
1790             gst_object_unref (output_pad);
1791           } else {
1792             output_pad = create_output_pad (urisrc, pad);
1793
1794             GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1795
1796             expose_raw_output_pad (urisrc, pad, output_pad);
1797           }
1798           gst_object_unref (pad);
1799         } else {
1800           gst_object_unref (pad);
1801         }
1802         g_value_reset (&item);
1803         break;
1804     }
1805   }
1806   g_value_unset (&item);
1807   gst_iterator_free (pads_iter);
1808   gst_caps_unref (rawcaps);
1809
1810   /* check for padtemplates that list SOMETIMES pads to
1811    * determine if the element is dynamic. */
1812   elemclass = GST_ELEMENT_GET_CLASS (urisrc->source);
1813   walk = gst_element_class_get_pad_template_list (elemclass);
1814   while (walk != NULL) {
1815     GstPadTemplate *templ;
1816
1817     templ = (GstPadTemplate *) walk->data;
1818     if (GST_PAD_TEMPLATE_DIRECTION (templ) == GST_PAD_SRC) {
1819       if (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_SOMETIMES)
1820         *is_dynamic = TRUE;
1821       break;
1822     }
1823     walk = g_list_next (walk);
1824   }
1825
1826   if (nb_pads && nb_pads == nb_raw)
1827     *all_pads_raw = TRUE;
1828
1829   return res;
1830 no_slot:
1831   {
1832     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1833     gst_object_unref (pad);
1834     g_value_unset (&item);
1835     gst_iterator_free (pads_iter);
1836     gst_caps_unref (rawcaps);
1837
1838     return FALSE;
1839   }
1840 }
1841
1842 /* Remove any adaptive demuxer element */
1843 static void
1844 remove_demuxer (GstURISourceBin * bin)
1845 {
1846   if (bin->demuxer) {
1847     GST_DEBUG_OBJECT (bin, "removing old demuxer element");
1848     gst_element_set_state (bin->demuxer, GST_STATE_NULL);
1849     gst_bin_remove (GST_BIN_CAST (bin), bin->demuxer);
1850     bin->demuxer = NULL;
1851     bin->demuxer_handles_buffering = FALSE;
1852   }
1853 }
1854
1855 /* make a demuxer and connect to all the signals */
1856 static GstElement *
1857 make_demuxer (GstURISourceBin * urisrc, GstCaps * caps)
1858 {
1859   GList *factories, *eligible, *cur;
1860   GstElement *demuxer = NULL;
1861   GParamSpec *pspec;
1862
1863   GST_LOG_OBJECT (urisrc, "making new adaptive demuxer");
1864
1865   /* now create the demuxer element */
1866
1867   /* FIXME: Fire a signal to get the demuxer? */
1868   factories = gst_element_factory_list_get_elements
1869       (GST_ELEMENT_FACTORY_TYPE_DEMUXER, GST_RANK_MARGINAL);
1870   eligible =
1871       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK,
1872       gst_caps_is_fixed (caps));
1873   gst_plugin_feature_list_free (factories);
1874
1875   if (eligible == NULL)
1876     goto no_demuxer;
1877
1878   eligible = g_list_sort (eligible, gst_plugin_feature_rank_compare_func);
1879
1880   for (cur = eligible; cur != NULL; cur = g_list_next (cur)) {
1881     GstElementFactory *factory = (GstElementFactory *) (cur->data);
1882     const gchar *klass =
1883         gst_element_factory_get_metadata (factory, GST_ELEMENT_METADATA_KLASS);
1884
1885     /* Can't be a demuxer unless it has Demux in the klass name */
1886     if (!strstr (klass, "Demux") || !strstr (klass, "Adaptive"))
1887       continue;
1888
1889     demuxer = gst_element_factory_create (factory, NULL);
1890     break;
1891   }
1892   gst_plugin_feature_list_free (eligible);
1893
1894   if (!demuxer)
1895     goto no_demuxer;
1896
1897   GST_DEBUG_OBJECT (urisrc, "Created adaptive demuxer %" GST_PTR_FORMAT,
1898       demuxer);
1899
1900   /* set up callbacks to create the links between
1901    * demuxer streams and output */
1902   g_signal_connect (demuxer,
1903       "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), urisrc);
1904   g_signal_connect (demuxer,
1905       "pad-removed", G_CALLBACK (pad_removed_cb), urisrc);
1906
1907   /* Propagate connection-speed property */
1908   pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (demuxer),
1909       "connection-speed");
1910   if (pspec != NULL)
1911     g_object_set (demuxer,
1912         "connection-speed", urisrc->connection_speed / 1000, NULL);
1913
1914   return demuxer;
1915
1916   /* ERRORS */
1917 no_demuxer:
1918   {
1919     /* FIXME: Fire the right error */
1920     GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN, (NULL),
1921         ("No demuxer element, check your installation"));
1922     return NULL;
1923   }
1924 }
1925
1926 static void
1927 handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
1928 {
1929   gboolean is_raw;
1930   GstStructure *s;
1931   const gchar *media_type;
1932   gboolean do_download = FALSE;
1933
1934   GST_URI_SOURCE_BIN_LOCK (urisrc);
1935
1936   /* if this is a pad with all raw caps, we can expose it */
1937   if (is_all_raw_caps (caps, DEFAULT_CAPS, &is_raw) && is_raw) {
1938     GstPad *output_pad;
1939
1940     GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT
1941         ", exposing", caps);
1942     output_pad = create_output_pad (urisrc, srcpad);
1943     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1944
1945     expose_raw_output_pad (urisrc, srcpad, output_pad);
1946     return;
1947   }
1948   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1949
1950   s = gst_caps_get_structure (caps, 0);
1951   media_type = gst_structure_get_name (s);
1952
1953   urisrc->is_adaptive = IS_ADAPTIVE_MEDIA (media_type);
1954
1955   if (urisrc->is_adaptive) {
1956     GstPad *sinkpad;
1957     GstPadLinkReturn link_res;
1958     GstQuery *query;
1959
1960     urisrc->demuxer = make_demuxer (urisrc, caps);
1961     if (!urisrc->demuxer)
1962       goto no_demuxer;
1963     gst_bin_add (GST_BIN_CAST (urisrc), urisrc->demuxer);
1964
1965     /* Query the demuxer to see if it can handle buffering */
1966     query = gst_query_new_buffering (GST_FORMAT_TIME);
1967     urisrc->demuxer_handles_buffering =
1968         gst_element_query (urisrc->demuxer, query);
1969     gst_query_unref (query);
1970     GST_DEBUG_OBJECT (urisrc, "Demuxer handles buffering : %d",
1971         urisrc->demuxer_handles_buffering);
1972
1973     sinkpad = gst_element_get_static_pad (urisrc->demuxer, "sink");
1974     if (sinkpad == NULL)
1975       goto no_demuxer_sink;
1976
1977     link_res = gst_pad_link (srcpad, sinkpad);
1978
1979     gst_object_unref (sinkpad);
1980     if (link_res != GST_PAD_LINK_OK)
1981       goto could_not_link;
1982
1983     gst_element_sync_state_with_parent (urisrc->demuxer);
1984   } else if (!urisrc->is_stream) {
1985     GstPad *output_pad;
1986     /* We don't need slot here, expose immediately */
1987     GST_URI_SOURCE_BIN_LOCK (urisrc);
1988     output_pad = create_output_pad (urisrc, srcpad);
1989     expose_raw_output_pad (urisrc, srcpad, output_pad);
1990     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1991   } else {
1992     OutputSlotInfo *slot;
1993     GstPad *output_pad;
1994
1995     /* only enable download buffering if the upstream duration is known */
1996     if (urisrc->download) {
1997       GstQuery *query = gst_query_new_duration (GST_FORMAT_BYTES);
1998       if (gst_pad_query (srcpad, query)) {
1999         gint64 dur;
2000         gst_query_parse_duration (query, NULL, &dur);
2001         do_download = (dur != -1);
2002       }
2003       gst_query_unref (query);
2004     }
2005
2006     GST_DEBUG_OBJECT (urisrc, "check media-type %s, do_download:%d", media_type,
2007         do_download);
2008
2009     GST_URI_SOURCE_BIN_LOCK (urisrc);
2010     slot = get_output_slot (urisrc, do_download, FALSE, NULL);
2011
2012     if (slot == NULL
2013         || gst_pad_link (srcpad, slot->queue_sinkpad) != GST_PAD_LINK_OK)
2014       goto could_not_link;
2015
2016     gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
2017         pre_queue_event_probe, urisrc, NULL);
2018
2019     output_pad = gst_object_ref (slot->output_pad);
2020     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
2021
2022     expose_output_pad (urisrc, output_pad);
2023     gst_object_unref (output_pad);
2024   }
2025
2026   return;
2027
2028   /* ERRORS */
2029 no_demuxer:
2030   {
2031     /* error was posted */
2032     return;
2033   }
2034 no_demuxer_sink:
2035   {
2036     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
2037         (NULL), ("Adaptive demuxer element has no 'sink' pad"));
2038     return;
2039   }
2040 could_not_link:
2041   {
2042     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
2043     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
2044         (NULL), ("Can't link typefind to adaptive demuxer element"));
2045     return;
2046   }
2047 }
2048
2049 /* signaled when we have a stream and we need to configure the download
2050  * buffering or regular buffering */
2051 static void
2052 type_found (GstElement * typefind, guint probability,
2053     GstCaps * caps, GstURISourceBin * urisrc)
2054 {
2055   GstPad *srcpad = gst_element_get_static_pad (typefind, "src");
2056
2057   GST_DEBUG_OBJECT (urisrc, "typefind found caps %" GST_PTR_FORMAT
2058       " on pad %" GST_PTR_FORMAT, caps, srcpad);
2059   handle_new_pad (urisrc, srcpad, caps);
2060
2061   gst_object_unref (GST_OBJECT (srcpad));
2062 }
2063
2064 /* setup typefind for any source. This will first plug a typefind element to the
2065  * source. After we find the type, we decide to whether to plug an adaptive
2066  * demuxer, or just link through queue2 (if needed) and expose the data */
2067 static gboolean
2068 setup_typefind (GstURISourceBin * urisrc, GstPad * srcpad)
2069 {
2070   GstElement *typefind;
2071
2072   /* now create the typefind element */
2073   typefind = gst_element_factory_make ("typefind", NULL);
2074   if (!typefind)
2075     goto no_typefind;
2076
2077   /* Make sure the bin doesn't set the typefind running yet */
2078   gst_element_set_locked_state (typefind, TRUE);
2079
2080   gst_bin_add (GST_BIN_CAST (urisrc), typefind);
2081
2082   if (!srcpad) {
2083     if (!gst_element_link_pads (urisrc->source, NULL, typefind, "sink"))
2084       goto could_not_link;
2085   } else {
2086     GstPad *sinkpad = gst_element_get_static_pad (typefind, "sink");
2087     GstPadLinkReturn ret;
2088
2089     ret = gst_pad_link (srcpad, sinkpad);
2090     gst_object_unref (sinkpad);
2091     if (ret != GST_PAD_LINK_OK)
2092       goto could_not_link;
2093   }
2094
2095   urisrc->typefinds = g_list_append (urisrc->typefinds, typefind);
2096
2097   /* connect a signal to find out when the typefind element found
2098    * a type */
2099   g_signal_connect (typefind, "have-type", G_CALLBACK (type_found), urisrc);
2100
2101   /* Now it can start */
2102   gst_element_set_locked_state (typefind, FALSE);
2103   gst_element_sync_state_with_parent (typefind);
2104
2105   return TRUE;
2106
2107   /* ERRORS */
2108 no_typefind:
2109   {
2110     post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), "typefind");
2111     GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN, (NULL),
2112         ("No typefind element, check your installation"));
2113     return FALSE;
2114   }
2115 could_not_link:
2116   {
2117     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
2118         (NULL), ("Can't link source to typefind element"));
2119     gst_bin_remove (GST_BIN_CAST (urisrc), typefind);
2120     return FALSE;
2121   }
2122 }
2123
2124 static void
2125 free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
2126 {
2127   GST_DEBUG_OBJECT (urisrc, "removing old queue element and freeing slot %p",
2128       slot);
2129   if (slot->bitrate_changed_id > 0)
2130     g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id);
2131   slot->bitrate_changed_id = 0;
2132
2133   gst_element_set_locked_state (slot->queue, TRUE);
2134   gst_element_set_state (slot->queue, GST_STATE_NULL);
2135   remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
2136   gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
2137
2138   gst_object_unref (slot->queue_sinkpad);
2139
2140   /* deactivate and remove the srcpad */
2141   gst_pad_set_active (slot->output_pad, FALSE);
2142   gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), slot->output_pad);
2143
2144   g_free (slot);
2145 }
2146
2147 static void
2148 call_free_output_slot (GstURISourceBin * urisrc, OutputSlotInfo * slot)
2149 {
2150   GST_LOG_OBJECT (urisrc, "free output slot in thread pool");
2151   free_output_slot (slot, urisrc);
2152 }
2153
2154 /* must be called with GST_URI_SOURCE_BIN_LOCK */
2155 static void
2156 free_output_slot_async (GstURISourceBin * urisrc, OutputSlotInfo * slot)
2157 {
2158   GST_LOG_OBJECT (urisrc, "pushing output slot on thread pool to free");
2159   urisrc->out_slots = g_slist_remove (urisrc->out_slots, slot);
2160   gst_element_call_async (GST_ELEMENT_CAST (urisrc),
2161       (GstElementCallAsyncFunc) call_free_output_slot, slot, NULL);
2162 }
2163
2164 static void
2165 unexpose_raw_pad_func (const GValue * item, GstURISourceBin * urisrc)
2166 {
2167   GstPad *pad = g_value_get_object (item);
2168   ChildSrcPadInfo *info =
2169       g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo");
2170
2171   if (info && info->output_pad != NULL)
2172     remove_output_pad (urisrc, info->output_pad);
2173 }
2174
2175 static void
2176 unexpose_src_pads (GstURISourceBin * urisrc, GstElement * element)
2177 {
2178   GstIterator *pads_iter;
2179
2180   pads_iter = gst_element_iterate_src_pads (element);
2181   gst_iterator_foreach (pads_iter,
2182       (GstIteratorForeachFunction) unexpose_raw_pad_func, urisrc);
2183   gst_iterator_free (pads_iter);
2184 }
2185
2186 static void
2187 remove_typefind (GstElement * typefind, GstURISourceBin * urisrc)
2188 {
2189   unexpose_src_pads (urisrc, typefind);
2190   gst_element_set_state (typefind, GST_STATE_NULL);
2191   gst_bin_remove (GST_BIN_CAST (urisrc), typefind);
2192 }
2193
2194 /* remove source and all related elements */
2195 static void
2196 remove_source (GstURISourceBin * urisrc)
2197 {
2198   if (urisrc->source) {
2199     GstElement *source = urisrc->source;
2200
2201     GST_DEBUG_OBJECT (urisrc, "removing old src element");
2202     unexpose_src_pads (urisrc, source);
2203     gst_element_set_state (source, GST_STATE_NULL);
2204
2205     if (urisrc->src_np_sig_id) {
2206       g_signal_handler_disconnect (source, urisrc->src_np_sig_id);
2207       urisrc->src_np_sig_id = 0;
2208     }
2209     gst_bin_remove (GST_BIN_CAST (urisrc), source);
2210     urisrc->source = NULL;
2211   }
2212
2213   if (urisrc->typefinds) {
2214     GST_DEBUG_OBJECT (urisrc, "removing old typefind elements");
2215     g_list_foreach (urisrc->typefinds, (GFunc) remove_typefind, urisrc);
2216     g_list_free (urisrc->typefinds);
2217     urisrc->typefinds = NULL;
2218   }
2219
2220   GST_URI_SOURCE_BIN_LOCK (urisrc);
2221   g_slist_foreach (urisrc->out_slots, (GFunc) free_output_slot, urisrc);
2222   g_slist_free (urisrc->out_slots);
2223   urisrc->out_slots = NULL;
2224   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
2225
2226   if (urisrc->demuxer)
2227     remove_demuxer (urisrc);
2228 }
2229
2230 /* is called when a dynamic source element created a new pad. */
2231 static void
2232 source_new_pad (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
2233 {
2234   GstCaps *caps;
2235
2236   GST_DEBUG_OBJECT (urisrc, "Found new pad %s.%s in source element %s",
2237       GST_DEBUG_PAD_NAME (pad), GST_ELEMENT_NAME (element));
2238   caps = gst_pad_get_current_caps (pad);
2239   if (caps == NULL)
2240     setup_typefind (urisrc, pad);
2241   else {
2242     handle_new_pad (urisrc, pad, caps);
2243     gst_caps_unref (caps);
2244   }
2245 }
2246
2247 /* construct and run the source and demuxer elements until we found
2248  * all the streams or until a preroll queue has been filled.
2249 */
2250 static gboolean
2251 setup_source (GstURISourceBin * urisrc)
2252 {
2253   gboolean all_pads_raw, have_out, is_dynamic;
2254
2255   GST_DEBUG_OBJECT (urisrc, "setup source");
2256
2257   /* delete old src */
2258   remove_source (urisrc);
2259
2260   /* create and configure an element that can handle the uri */
2261   if (!(urisrc->source = gen_source_element (urisrc)))
2262     goto no_source;
2263
2264   /* state will be merged later - if file is not found, error will be
2265    * handled by the application right after. */
2266   gst_bin_add (GST_BIN_CAST (urisrc), urisrc->source);
2267
2268   /* notify of the new source used */
2269   g_object_notify (G_OBJECT (urisrc), "source");
2270
2271   g_signal_emit (urisrc, gst_uri_source_bin_signals[SIGNAL_SOURCE_SETUP],
2272       0, urisrc->source);
2273
2274   /* see if the source element emits raw audio/video all by itself,
2275    * if so, we can create streams for the pads and be done with it.
2276    * Also check that is has source pads, if not, we assume it will
2277    * do everything itself.  */
2278   if (!analyse_source_and_expose_raw_pads (urisrc, &all_pads_raw, &have_out,
2279           &is_dynamic))
2280     goto invalid_source;
2281
2282   if (!is_dynamic) {
2283     if (all_pads_raw) {
2284       GST_DEBUG_OBJECT (urisrc, "Source provides all raw data");
2285       /* source provides raw data, we added the pads and we can now signal a
2286        * no_more pads because we are done. */
2287       gst_element_no_more_pads (GST_ELEMENT_CAST (urisrc));
2288       return TRUE;
2289     }
2290     if (!have_out)
2291       goto no_pads;
2292   } else {
2293     GST_DEBUG_OBJECT (urisrc, "Source has dynamic output pads");
2294     /* connect a handler for the new-pad signal */
2295     urisrc->src_np_sig_id =
2296         g_signal_connect (urisrc->source, "pad-added",
2297         G_CALLBACK (source_new_pad), urisrc);
2298   }
2299
2300   if (all_pads_raw) {
2301     GST_DEBUG_OBJECT (urisrc,
2302         "Got raw srcpads on a dynamic source, using them as is.");
2303
2304     return TRUE;
2305   } else if (urisrc->is_stream) {
2306     GST_DEBUG_OBJECT (urisrc, "Setting up streaming");
2307     /* do the stream things here */
2308     if (!setup_typefind (urisrc, NULL))
2309       goto streaming_failed;
2310   } else {
2311     GstIterator *pads_iter;
2312     gboolean done = FALSE;
2313
2314     /* Expose all non-raw srcpads */
2315     pads_iter = gst_element_iterate_src_pads (urisrc->source);
2316     while (!done) {
2317       GValue item = { 0, };
2318       GstPad *pad;
2319
2320       switch (gst_iterator_next (pads_iter, &item)) {
2321         case GST_ITERATOR_ERROR:
2322           GST_WARNING_OBJECT (urisrc, "Error iterating pads on source element");
2323           /* FALLTHROUGH */
2324         case GST_ITERATOR_DONE:
2325           done = TRUE;
2326           break;
2327         case GST_ITERATOR_RESYNC:
2328           /* reset results and resync */
2329           gst_iterator_resync (pads_iter);
2330           break;
2331         case GST_ITERATOR_OK:
2332           pad = g_value_get_object (&item);
2333           if (!setup_typefind (urisrc, pad)) {
2334             gst_iterator_free (pads_iter);
2335             goto streaming_failed;
2336           }
2337           g_value_reset (&item);
2338           break;
2339       }
2340     }
2341     gst_iterator_free (pads_iter);
2342   }
2343
2344   return TRUE;
2345
2346   /* ERRORS */
2347 no_source:
2348   {
2349     /* error message was already posted */
2350     return FALSE;
2351   }
2352 invalid_source:
2353   {
2354     GST_ELEMENT_ERROR (urisrc, CORE, FAILED,
2355         (_("Source element is invalid.")), (NULL));
2356     return FALSE;
2357   }
2358 no_pads:
2359   {
2360     GST_ELEMENT_ERROR (urisrc, CORE, FAILED,
2361         (_("Source element has no pads.")), (NULL));
2362     return FALSE;
2363   }
2364 streaming_failed:
2365   {
2366     /* message was posted */
2367     return FALSE;
2368   }
2369 }
2370
2371 static void
2372 value_list_append_structure_list (GValue * list_val, GstStructure ** first,
2373     GList * structure_list)
2374 {
2375   GList *l;
2376
2377   for (l = structure_list; l != NULL; l = l->next) {
2378     GValue val = { 0, };
2379
2380     if (*first == NULL)
2381       *first = gst_structure_copy ((GstStructure *) l->data);
2382
2383     g_value_init (&val, GST_TYPE_STRUCTURE);
2384     g_value_take_boxed (&val, gst_structure_copy ((GstStructure *) l->data));
2385     gst_value_list_append_value (list_val, &val);
2386     g_value_unset (&val);
2387   }
2388 }
2389
2390 /* if it's a redirect message with multiple redirect locations we might
2391  * want to pick a different 'best' location depending on the required
2392  * bitrates and the connection speed */
2393 static GstMessage *
2394 handle_redirect_message (GstURISourceBin * urisrc, GstMessage * msg)
2395 {
2396   const GValue *locations_list, *location_val;
2397   GstMessage *new_msg;
2398   GstStructure *new_structure = NULL;
2399   GList *l_good = NULL, *l_neutral = NULL, *l_bad = NULL;
2400   GValue new_list = { 0, };
2401   guint size, i;
2402   const GstStructure *structure;
2403
2404   GST_DEBUG_OBJECT (urisrc, "redirect message: %" GST_PTR_FORMAT, msg);
2405   GST_DEBUG_OBJECT (urisrc, "connection speed: %" G_GUINT64_FORMAT,
2406       urisrc->connection_speed);
2407
2408   structure = gst_message_get_structure (msg);
2409   if (urisrc->connection_speed == 0 || structure == NULL)
2410     return msg;
2411
2412   locations_list = gst_structure_get_value (structure, "locations");
2413   if (locations_list == NULL)
2414     return msg;
2415
2416   size = gst_value_list_get_size (locations_list);
2417   if (size < 2)
2418     return msg;
2419
2420   /* maintain existing order as much as possible, just sort references
2421    * with too high a bitrate to the end (the assumption being that if
2422    * bitrates are given they are given for all interesting streams and
2423    * that the you-need-at-least-version-xyz redirect has the same bitrate
2424    * as the lowest referenced redirect alternative) */
2425   for (i = 0; i < size; ++i) {
2426     const GstStructure *s;
2427     gint bitrate = 0;
2428
2429     location_val = gst_value_list_get_value (locations_list, i);
2430     s = (const GstStructure *) g_value_get_boxed (location_val);
2431     if (!gst_structure_get_int (s, "minimum-bitrate", &bitrate) || bitrate <= 0) {
2432       GST_DEBUG_OBJECT (urisrc, "no bitrate: %" GST_PTR_FORMAT, s);
2433       l_neutral = g_list_append (l_neutral, (gpointer) s);
2434     } else if (bitrate > urisrc->connection_speed) {
2435       GST_DEBUG_OBJECT (urisrc, "bitrate too high: %" GST_PTR_FORMAT, s);
2436       l_bad = g_list_append (l_bad, (gpointer) s);
2437     } else if (bitrate <= urisrc->connection_speed) {
2438       GST_DEBUG_OBJECT (urisrc, "bitrate OK: %" GST_PTR_FORMAT, s);
2439       l_good = g_list_append (l_good, (gpointer) s);
2440     }
2441   }
2442
2443   g_value_init (&new_list, GST_TYPE_LIST);
2444   value_list_append_structure_list (&new_list, &new_structure, l_good);
2445   value_list_append_structure_list (&new_list, &new_structure, l_neutral);
2446   value_list_append_structure_list (&new_list, &new_structure, l_bad);
2447   gst_structure_take_value (new_structure, "locations", &new_list);
2448
2449   g_list_free (l_good);
2450   g_list_free (l_neutral);
2451   g_list_free (l_bad);
2452
2453   new_msg = gst_message_new_element (msg->src, new_structure);
2454   gst_message_unref (msg);
2455
2456   GST_DEBUG_OBJECT (urisrc, "new redirect message: %" GST_PTR_FORMAT, new_msg);
2457   return new_msg;
2458 }
2459
2460 static void
2461 handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
2462 {
2463   gint perc, msg_perc;
2464   gint smaller_perc = 100;
2465   GstMessage *smaller = NULL;
2466   GList *found = NULL;
2467   GList *iter;
2468   OutputSlotInfo *slot;
2469
2470   /* buffering messages must be aggregated as there might be multiple
2471    * multiqueue in the pipeline and their independent buffering messages
2472    * will confuse the application
2473    *
2474    * urisourcebin keeps a list of messages received from elements that are
2475    * buffering.
2476    * Rules are:
2477    * 0) Ignore buffering from elements that are draining (is_eos == TRUE)
2478    * 1) Always post the smaller buffering %
2479    * 2) If an element posts a 100% buffering message, remove it from the list
2480    * 3) When there are no more messages on the list, post 100% message
2481    * 4) When an element posts a new buffering message, update the one
2482    *    on the list to this new value
2483    */
2484   gst_message_parse_buffering (msg, &msg_perc);
2485   GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT
2486       " with %d%%", GST_MESSAGE_SRC (msg), msg_perc);
2487
2488   slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (msg)),
2489       "urisourcebin.slotinfo");
2490
2491   BUFFERING_LOCK (urisrc);
2492   if (slot && slot->is_eos) {
2493     /* Ignore buffering messages from queues we marked as EOS,
2494      * we already removed those from the list of buffering
2495      * objects */
2496     BUFFERING_UNLOCK (urisrc);
2497     gst_message_replace (&msg, NULL);
2498     return;
2499   }
2500
2501
2502   g_mutex_lock (&urisrc->buffering_post_lock);
2503
2504   /*
2505    * Single loop for 2 things:
2506    * 1) Look for a message with the same source
2507    *   1.1) If the received message is 100%, remove it from the list
2508    * 2) Find the minimum buffering from the list from elements that aren't EOS
2509    */
2510   for (iter = urisrc->buffering_status; iter;) {
2511     GstMessage *bufstats = iter->data;
2512     gboolean is_eos = FALSE;
2513
2514     slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
2515         "urisourcebin.slotinfo");
2516     if (slot)
2517       is_eos = slot->is_eos;
2518
2519     if (GST_MESSAGE_SRC (bufstats) == GST_MESSAGE_SRC (msg)) {
2520       found = iter;
2521       if (msg_perc < 100) {
2522         gst_message_unref (iter->data);
2523         bufstats = iter->data = gst_message_ref (msg);
2524       } else {
2525         GList *current = iter;
2526
2527         /* remove the element here and avoid confusing the loop */
2528         iter = g_list_next (iter);
2529
2530         gst_message_unref (current->data);
2531         urisrc->buffering_status =
2532             g_list_delete_link (urisrc->buffering_status, current);
2533
2534         continue;
2535       }
2536     }
2537
2538     /* only update minimum stat for non-EOS slots */
2539     if (!is_eos) {
2540       gst_message_parse_buffering (bufstats, &perc);
2541       if (perc < smaller_perc) {
2542         smaller_perc = perc;
2543         smaller = bufstats;
2544       }
2545     } else {
2546       GST_LOG_OBJECT (urisrc, "Ignoring buffering from EOS element");
2547     }
2548     iter = g_list_next (iter);
2549   }
2550
2551   if (found == NULL && msg_perc < 100) {
2552     if (msg_perc < smaller_perc) {
2553       smaller_perc = msg_perc;
2554       smaller = msg;
2555     }
2556     urisrc->buffering_status =
2557         g_list_prepend (urisrc->buffering_status, gst_message_ref (msg));
2558   }
2559
2560   if (smaller_perc == urisrc->last_buffering_pct) {
2561     /* Don't repeat our last buffering status */
2562     gst_message_replace (&msg, NULL);
2563   } else {
2564     urisrc->last_buffering_pct = smaller_perc;
2565
2566     /* now compute the buffering message that should be posted */
2567     if (smaller_perc == 100) {
2568       g_assert (urisrc->buffering_status == NULL);
2569       /* we are posting the original received msg */
2570     } else {
2571       gst_message_replace (&msg, smaller);
2572     }
2573   }
2574   BUFFERING_UNLOCK (urisrc);
2575
2576   if (msg) {
2577     GST_LOG_OBJECT (urisrc, "Sending buffering msg from %" GST_PTR_FORMAT
2578         " with %d%%", GST_MESSAGE_SRC (msg), smaller_perc);
2579     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN (urisrc), msg);
2580   } else {
2581     GST_LOG_OBJECT (urisrc, "Dropped buffering msg as a repeat of %d%%",
2582         smaller_perc);
2583   }
2584   g_mutex_unlock (&urisrc->buffering_post_lock);
2585 }
2586
2587 /* Remove any buffering message from the given source */
2588 static void
2589 remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src)
2590 {
2591   GList *iter;
2592   gboolean removed = FALSE, post;
2593
2594   BUFFERING_LOCK (urisrc);
2595   g_mutex_lock (&urisrc->buffering_post_lock);
2596
2597   GST_DEBUG_OBJECT (urisrc, "Removing %" GST_PTR_FORMAT
2598       " buffering messages", src);
2599
2600   for (iter = urisrc->buffering_status; iter;) {
2601     GstMessage *bufstats = iter->data;
2602     if (GST_MESSAGE_SRC (bufstats) == src) {
2603       gst_message_unref (bufstats);
2604       urisrc->buffering_status =
2605           g_list_delete_link (urisrc->buffering_status, iter);
2606       removed = TRUE;
2607       break;
2608     }
2609     iter = g_list_next (iter);
2610   }
2611
2612   post = (removed && urisrc->buffering_status == NULL);
2613   BUFFERING_UNLOCK (urisrc);
2614
2615   if (post) {
2616     GST_DEBUG_OBJECT (urisrc, "Last buffering element done - posting 100%%");
2617
2618     /* removed the last buffering element, post 100% */
2619     gst_element_post_message (GST_ELEMENT_CAST (urisrc),
2620         gst_message_new_buffering (GST_OBJECT_CAST (urisrc), 100));
2621   }
2622
2623   g_mutex_unlock (&urisrc->buffering_post_lock);
2624 }
2625
2626 static void
2627 handle_message (GstBin * bin, GstMessage * msg)
2628 {
2629   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (bin);
2630
2631   switch (GST_MESSAGE_TYPE (msg)) {
2632     case GST_MESSAGE_ELEMENT:{
2633       if (gst_message_has_name (msg, "redirect")) {
2634         /* sort redirect messages based on the connection speed. This simplifies
2635          * the user of this element as it can in most cases just pick the first item
2636          * of the sorted list as a good redirection candidate. It can of course
2637          * choose something else from the list if it has a better way. */
2638         msg = handle_redirect_message (urisrc, msg);
2639       }
2640       break;
2641     }
2642     case GST_MESSAGE_STREAM_COLLECTION:
2643     {
2644       GST_DEBUG_OBJECT (urisrc, "Source is streams-aware");
2645       urisrc->source_streams_aware = TRUE;
2646       break;
2647     }
2648     case GST_MESSAGE_BUFFERING:
2649       handle_buffering_message (urisrc, msg);
2650       msg = NULL;
2651       break;
2652     default:
2653       break;
2654   }
2655
2656   if (msg)
2657     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
2658 }
2659
2660 /* generic struct passed to all query fold methods
2661  * FIXME, move to core.
2662  */
2663 typedef struct
2664 {
2665   GstQuery *query;
2666   gint64 min;
2667   gint64 max;
2668   gboolean seekable;
2669   gboolean live;
2670 } QueryFold;
2671
2672 typedef void (*QueryInitFunction) (GstURISourceBin * urisrc, QueryFold * fold);
2673 typedef void (*QueryDoneFunction) (GstURISourceBin * urisrc, QueryFold * fold);
2674
2675 /* for duration/position we collect all durations/positions and take
2676  * the MAX of all valid results */
2677 static void
2678 uri_source_query_init (GstURISourceBin * urisrc, QueryFold * fold)
2679 {
2680   fold->min = 0;
2681   fold->max = -1;
2682   fold->seekable = TRUE;
2683   fold->live = 0;
2684 }
2685
2686 static gboolean
2687 uri_source_query_duration_fold (const GValue * item, GValue * ret,
2688     QueryFold * fold)
2689 {
2690   GstPad *pad = g_value_get_object (item);
2691
2692   if (gst_pad_query (pad, fold->query)) {
2693     gint64 duration;
2694
2695     g_value_set_boolean (ret, TRUE);
2696
2697     gst_query_parse_duration (fold->query, NULL, &duration);
2698
2699     GST_DEBUG_OBJECT (item, "got duration %" G_GINT64_FORMAT, duration);
2700
2701     if (duration > fold->max)
2702       fold->max = duration;
2703   }
2704   return TRUE;
2705 }
2706
2707 static void
2708 uri_source_query_duration_done (GstURISourceBin * urisrc, QueryFold * fold)
2709 {
2710   GstFormat format;
2711
2712   gst_query_parse_duration (fold->query, &format, NULL);
2713   /* store max in query result */
2714   gst_query_set_duration (fold->query, format, fold->max);
2715
2716   GST_DEBUG ("max duration %" G_GINT64_FORMAT, fold->max);
2717 }
2718
2719 static gboolean
2720 uri_source_query_position_fold (const GValue * item, GValue * ret,
2721     QueryFold * fold)
2722 {
2723   GstPad *pad = g_value_get_object (item);
2724
2725   if (gst_pad_query (pad, fold->query)) {
2726     gint64 position;
2727
2728     g_value_set_boolean (ret, TRUE);
2729
2730     gst_query_parse_position (fold->query, NULL, &position);
2731
2732     GST_DEBUG_OBJECT (item, "got position %" G_GINT64_FORMAT, position);
2733
2734     if (position > fold->max)
2735       fold->max = position;
2736   }
2737
2738   return TRUE;
2739 }
2740
2741 static void
2742 uri_source_query_position_done (GstURISourceBin * urisrc, QueryFold * fold)
2743 {
2744   GstFormat format;
2745
2746   gst_query_parse_position (fold->query, &format, NULL);
2747   /* store max in query result */
2748   gst_query_set_position (fold->query, format, fold->max);
2749
2750   GST_DEBUG_OBJECT (urisrc, "max position %" G_GINT64_FORMAT, fold->max);
2751 }
2752
2753 static gboolean
2754 uri_source_query_latency_fold (const GValue * item, GValue * ret,
2755     QueryFold * fold)
2756 {
2757   GstPad *pad = g_value_get_object (item);
2758
2759   if (gst_pad_query (pad, fold->query)) {
2760     GstClockTime min, max;
2761     gboolean live;
2762
2763     gst_query_parse_latency (fold->query, &live, &min, &max);
2764
2765     GST_DEBUG_OBJECT (pad,
2766         "got latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
2767         ", live %d", GST_TIME_ARGS (min), GST_TIME_ARGS (max), live);
2768
2769     if (live) {
2770       /* for the combined latency we collect the MAX of all min latencies and
2771        * the MIN of all max latencies */
2772       if (min > fold->min)
2773         fold->min = min;
2774       if (fold->max == -1)
2775         fold->max = max;
2776       else if (max < fold->max)
2777         fold->max = max;
2778
2779       fold->live = TRUE;
2780     }
2781   } else {
2782     GST_LOG_OBJECT (pad, "latency query failed");
2783     g_value_set_boolean (ret, FALSE);
2784   }
2785
2786   return TRUE;
2787 }
2788
2789 static void
2790 uri_source_query_latency_done (GstURISourceBin * urisrc, QueryFold * fold)
2791 {
2792   /* store max in query result */
2793   gst_query_set_latency (fold->query, fold->live, fold->min, fold->max);
2794
2795   GST_DEBUG_OBJECT (urisrc,
2796       "latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
2797       ", live %d", GST_TIME_ARGS (fold->min), GST_TIME_ARGS (fold->max),
2798       fold->live);
2799 }
2800
2801 /* we are seekable if all srcpads are seekable */
2802 static gboolean
2803 uri_source_query_seeking_fold (const GValue * item, GValue * ret,
2804     QueryFold * fold)
2805 {
2806   GstPad *pad = g_value_get_object (item);
2807
2808   if (gst_pad_query (pad, fold->query)) {
2809     gboolean seekable;
2810
2811     g_value_set_boolean (ret, TRUE);
2812     gst_query_parse_seeking (fold->query, NULL, &seekable, NULL, NULL);
2813
2814     GST_DEBUG_OBJECT (item, "got seekable %d", seekable);
2815
2816     if (fold->seekable)
2817       fold->seekable = seekable;
2818   }
2819
2820   return TRUE;
2821 }
2822
2823 static void
2824 uri_source_query_seeking_done (GstURISourceBin * urisrc, QueryFold * fold)
2825 {
2826   GstFormat format;
2827
2828   gst_query_parse_seeking (fold->query, &format, NULL, NULL, NULL);
2829   gst_query_set_seeking (fold->query, format, fold->seekable, 0, -1);
2830
2831   GST_DEBUG_OBJECT (urisrc, "seekable %d", fold->seekable);
2832 }
2833
2834 /* generic fold, return first valid result */
2835 static gboolean
2836 uri_source_query_generic_fold (const GValue * item, GValue * ret,
2837     QueryFold * fold)
2838 {
2839   GstPad *pad = g_value_get_object (item);
2840   gboolean res;
2841
2842   if ((res = gst_pad_query (pad, fold->query))) {
2843     g_value_set_boolean (ret, TRUE);
2844     GST_DEBUG_OBJECT (item, "answered query %p", fold->query);
2845   }
2846
2847   /* and stop as soon as we have a valid result */
2848   return !res;
2849 }
2850
2851 /* we're a bin, the default query handler iterates sink elements, which we don't
2852  * have normally. We should just query all source pads.
2853  */
2854 static gboolean
2855 gst_uri_source_bin_query (GstElement * element, GstQuery * query)
2856 {
2857   GstURISourceBin *urisrc;
2858   gboolean res = FALSE;
2859   GstIterator *iter;
2860   GstIteratorFoldFunction fold_func;
2861   QueryInitFunction fold_init = NULL;
2862   QueryDoneFunction fold_done = NULL;
2863   QueryFold fold_data;
2864   GValue ret = { 0 };
2865   gboolean default_ret = FALSE;
2866
2867   urisrc = GST_URI_SOURCE_BIN (element);
2868
2869   switch (GST_QUERY_TYPE (query)) {
2870     case GST_QUERY_DURATION:
2871       /* iterate and collect durations */
2872       fold_func = (GstIteratorFoldFunction) uri_source_query_duration_fold;
2873       fold_init = uri_source_query_init;
2874       fold_done = uri_source_query_duration_done;
2875       break;
2876     case GST_QUERY_POSITION:
2877       /* iterate and collect durations */
2878       fold_func = (GstIteratorFoldFunction) uri_source_query_position_fold;
2879       fold_init = uri_source_query_init;
2880       fold_done = uri_source_query_position_done;
2881       break;
2882     case GST_QUERY_LATENCY:
2883       /* iterate and collect durations */
2884       fold_func = (GstIteratorFoldFunction) uri_source_query_latency_fold;
2885       fold_init = uri_source_query_init;
2886       fold_done = uri_source_query_latency_done;
2887       default_ret = TRUE;
2888       break;
2889     case GST_QUERY_SEEKING:
2890       /* iterate and collect durations */
2891       fold_func = (GstIteratorFoldFunction) uri_source_query_seeking_fold;
2892       fold_init = uri_source_query_init;
2893       fold_done = uri_source_query_seeking_done;
2894       break;
2895     default:
2896       fold_func = (GstIteratorFoldFunction) uri_source_query_generic_fold;
2897       break;
2898   }
2899
2900   fold_data.query = query;
2901
2902   g_value_init (&ret, G_TYPE_BOOLEAN);
2903   g_value_set_boolean (&ret, default_ret);
2904
2905   iter = gst_element_iterate_src_pads (element);
2906   GST_DEBUG_OBJECT (element, "Sending query %p (type %d) to src pads",
2907       query, GST_QUERY_TYPE (query));
2908
2909   if (fold_init)
2910     fold_init (urisrc, &fold_data);
2911
2912   while (TRUE) {
2913     GstIteratorResult ires;
2914
2915     ires = gst_iterator_fold (iter, fold_func, &ret, &fold_data);
2916
2917     switch (ires) {
2918       case GST_ITERATOR_RESYNC:
2919         gst_iterator_resync (iter);
2920         if (fold_init)
2921           fold_init (urisrc, &fold_data);
2922         g_value_set_boolean (&ret, default_ret);
2923         break;
2924       case GST_ITERATOR_OK:
2925       case GST_ITERATOR_DONE:
2926         res = g_value_get_boolean (&ret);
2927         if (fold_done != NULL && res)
2928           fold_done (urisrc, &fold_data);
2929         goto done;
2930       default:
2931         res = FALSE;
2932         goto done;
2933     }
2934   }
2935 done:
2936   gst_iterator_free (iter);
2937
2938   return res;
2939 }
2940
2941 static GstStateChangeReturn
2942 gst_uri_source_bin_change_state (GstElement * element,
2943     GstStateChange transition)
2944 {
2945   GstStateChangeReturn ret;
2946   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (element);
2947
2948   switch (transition) {
2949     case GST_STATE_CHANGE_READY_TO_PAUSED:
2950       GST_DEBUG ("ready to paused");
2951       if (!setup_source (urisrc))
2952         goto source_failed;
2953       break;
2954     default:
2955       break;
2956   }
2957
2958   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2959   if (ret == GST_STATE_CHANGE_FAILURE)
2960     goto setup_failed;
2961
2962   switch (transition) {
2963     case GST_STATE_CHANGE_READY_TO_PAUSED:
2964       break;
2965     case GST_STATE_CHANGE_PAUSED_TO_READY:
2966       GST_DEBUG ("paused to ready");
2967       remove_source (urisrc);
2968       g_list_free_full (urisrc->buffering_status,
2969           (GDestroyNotify) gst_message_unref);
2970       urisrc->buffering_status = NULL;
2971       urisrc->last_buffering_pct = -1;
2972       urisrc->source_streams_aware = FALSE;
2973       break;
2974     case GST_STATE_CHANGE_READY_TO_NULL:
2975       GST_DEBUG ("ready to null");
2976       remove_source (urisrc);
2977       break;
2978     default:
2979       break;
2980   }
2981   return ret;
2982
2983   /* ERRORS */
2984 source_failed:
2985   {
2986     return GST_STATE_CHANGE_FAILURE;
2987   }
2988 setup_failed:
2989   {
2990     /* clean up leftover groups */
2991     return GST_STATE_CHANGE_FAILURE;
2992   }
2993 }