queue2: avoid ping-pong between 0% and 100% buffering messages
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  * @title: queue2
29  *
30  * Data is queued until one of the limits specified by the
31  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
32  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
33  * more buffers into the queue will block the pushing thread until more space
34  * becomes available.
35  *
36  * The queue will create a new thread on the source pad to decouple the
37  * processing on sink and source pad.
38  *
39  * You can query how many buffers are queued by reading the
40  * #GstQueue2:current-level-buffers property.
41  *
42  * The default queue size limits are 100 buffers, 2MB of data, or
43  * two seconds worth of data, whichever is reached first.
44  *
45  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
46  * will allocate a random free filename and buffer data in the file.
47  * By using this, it will buffer the entire stream data on the file independently
48  * of the queue size limits, they will only be used for buffering statistics.
49  *
50  * The temp-location property will be used to notify the application of the
51  * allocated filename.
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include "gstqueue2.h"
59
60 #include <glib/gstdio.h>
61
62 #include "gst/gst-i18n-lib.h"
63 #include "gst/glib-compat-private.h"
64
65 #include <string.h>
66
67 #ifdef G_OS_WIN32
68 #include <io.h>                 /* lseek, open, close, read */
69 #undef lseek
70 #define lseek _lseeki64
71 #undef off_t
72 #define off_t guint64
73 #else
74 #include <unistd.h>
75 #endif
76
77 #ifdef __BIONIC__               /* Android */
78 #include <fcntl.h>
79 #endif
80
81 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
82     GST_PAD_SINK,
83     GST_PAD_ALWAYS,
84     GST_STATIC_CAPS_ANY);
85
86 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
87     GST_PAD_SRC,
88     GST_PAD_ALWAYS,
89     GST_STATIC_CAPS_ANY);
90
91 GST_DEBUG_CATEGORY_STATIC (queue_debug);
92 #define GST_CAT_DEFAULT (queue_debug)
93 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
94
95 enum
96 {
97   LAST_SIGNAL
98 };
99
100 /* other defines */
101 #define DEFAULT_BUFFER_SIZE 4096
102 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
103 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
104 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
105
106 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
107
108 /* default property values */
109 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
110 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
111 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
112 #define DEFAULT_USE_BUFFERING      FALSE
113 #define DEFAULT_USE_TAGS_BITRATE   FALSE
114 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
115 #define DEFAULT_LOW_PERCENT        10
116 #define DEFAULT_HIGH_PERCENT       99
117 #define DEFAULT_LOW_WATERMARK      0.01
118 #define DEFAULT_HIGH_WATERMARK     0.99
119 #define DEFAULT_TEMP_REMOVE        TRUE
120 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
121
122 enum
123 {
124   PROP_0,
125   PROP_CUR_LEVEL_BUFFERS,
126   PROP_CUR_LEVEL_BYTES,
127   PROP_CUR_LEVEL_TIME,
128   PROP_MAX_SIZE_BUFFERS,
129   PROP_MAX_SIZE_BYTES,
130   PROP_MAX_SIZE_TIME,
131   PROP_USE_BUFFERING,
132   PROP_USE_TAGS_BITRATE,
133   PROP_USE_RATE_ESTIMATE,
134   PROP_LOW_PERCENT,
135   PROP_HIGH_PERCENT,
136   PROP_LOW_WATERMARK,
137   PROP_HIGH_WATERMARK,
138   PROP_TEMP_TEMPLATE,
139   PROP_TEMP_LOCATION,
140   PROP_TEMP_REMOVE,
141   PROP_RING_BUFFER_MAX_SIZE,
142   PROP_AVG_IN_RATE,
143   PROP_LAST
144 };
145
146 /* Explanation for buffer levels and percentages:
147  *
148  * The buffering_level functions here return a value in a normalized range
149  * that specifies the queue's current fill level. The range goes from 0 to
150  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
151  *
152  * This is not to be confused with the buffering_percent value, which is
153  * a *relative* quantity - relative to the low/high watermarks.
154  * buffering_percent = 0% means buffering_level is at the low watermark.
155  * buffering_percent = 100% means buffering_level is at the high watermark.
156  * buffering_percent is used for determining if the fill level has reached
157  * the high watermark, and for producing BUFFERING messages. This value
158  * always uses a 0..100 range (since it is a percentage).
159  *
160  * To avoid future confusions, whenever "buffering level" is mentioned, it
161  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
162  * range. Whenever "buffering_percent" is mentioned, it refers to the
163  * percentage value that is relative to the low/high watermark. */
164
165 /* Using a buffering level range of 0..1000000 to allow for a
166  * resolution in ppm (1 ppm = 0.0001%) */
167 #define MAX_BUFFERING_LEVEL 1000000
168
169 /* How much 1% makes up in the buffer level range */
170 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
171
172 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
173   l.buffers = 0;                                        \
174   l.bytes = 0;                                          \
175   l.time = 0;                                           \
176   l.rate_time = 0;                                      \
177 } G_STMT_END
178
179 #define STATUS(queue, pad, msg) \
180   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
181                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
182                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
183                       " ns, %"G_GUINT64_FORMAT" items", \
184                       GST_DEBUG_PAD_NAME (pad), \
185                       queue->cur_level.buffers, \
186                       queue->max_level.buffers, \
187                       queue->cur_level.bytes, \
188                       queue->max_level.bytes, \
189                       queue->cur_level.time, \
190                       queue->max_level.time, \
191                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
192                         queue->current->writing_pos - queue->current->max_reading_pos : \
193                         gst_queue_array_get_length(queue->queue)))
194
195 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
196   g_mutex_lock (&q->qlock);                                              \
197 } G_STMT_END
198
199 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
200   GST_QUEUE2_MUTEX_LOCK (q);                                            \
201   if (res != GST_FLOW_OK)                                               \
202     goto label;                                                         \
203 } G_STMT_END
204
205 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
206   g_mutex_unlock (&q->qlock);                                            \
207 } G_STMT_END
208
209 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
210   STATUS (queue, q->sinkpad, "wait for DEL");                           \
211   q->waiting_del = TRUE;                                                \
212   g_cond_wait (&q->item_del, &queue->qlock);                              \
213   q->waiting_del = FALSE;                                               \
214   if (res != GST_FLOW_OK) {                                             \
215     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
216     goto label;                                                         \
217   }                                                                     \
218   STATUS (queue, q->sinkpad, "received DEL");                           \
219 } G_STMT_END
220
221 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
222   STATUS (queue, q->srcpad, "wait for ADD");                            \
223   q->waiting_add = TRUE;                                                \
224   g_cond_wait (&q->item_add, &q->qlock);                                  \
225   q->waiting_add = FALSE;                                               \
226   if (res != GST_FLOW_OK) {                                             \
227     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
228     goto label;                                                         \
229   }                                                                     \
230   STATUS (queue, q->srcpad, "received ADD");                            \
231 } G_STMT_END
232
233 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
234   if (q->waiting_del) {                                                 \
235     STATUS (q, q->srcpad, "signal DEL");                                \
236     g_cond_signal (&q->item_del);                                        \
237   }                                                                     \
238 } G_STMT_END
239
240 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
241   if (q->waiting_add) {                                                 \
242     STATUS (q, q->sinkpad, "signal ADD");                               \
243     g_cond_signal (&q->item_add);                                        \
244   }                                                                     \
245 } G_STMT_END
246
247 #define SET_PERCENT(q, perc) G_STMT_START {                              \
248   if (perc != q->buffering_percent) {                                    \
249     q->buffering_percent = perc;                                         \
250     q->percent_changed = TRUE;                                           \
251     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
252     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
253         &q->buffering_left);                                             \
254   }                                                                      \
255 } G_STMT_END
256
257 #define _do_init \
258     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
259     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
260         "dataflow inside the queue element");
261 #define gst_queue2_parent_class parent_class
262 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
263
264 static void gst_queue2_finalize (GObject * object);
265
266 static void gst_queue2_set_property (GObject * object,
267     guint prop_id, const GValue * value, GParamSpec * pspec);
268 static void gst_queue2_get_property (GObject * object,
269     guint prop_id, GValue * value, GParamSpec * pspec);
270
271 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
272     GstBuffer * buffer);
273 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
274     GstBufferList * buffer_list);
275 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
276 static void gst_queue2_loop (GstPad * pad);
277
278 static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad,
279     GstObject * parent, GstEvent * event);
280 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
281     GstQuery * query);
282
283 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
284     GstEvent * event);
285 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
286     GstQuery * query);
287 static gboolean gst_queue2_handle_query (GstElement * element,
288     GstQuery * query);
289
290 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
291     guint64 offset, guint length, GstBuffer ** buffer);
292
293 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
294     GstPadMode mode, gboolean active);
295 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
296     GstPadMode mode, gboolean active);
297 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
298     GstStateChange transition);
299
300 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
301 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
302
303 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
304 static void update_in_rates (GstQueue2 * queue, gboolean force);
305 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
306 static void gst_queue2_post_buffering (GstQueue2 * queue);
307
308 typedef enum
309 {
310   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
311   GST_QUEUE2_ITEM_TYPE_BUFFER,
312   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
313   GST_QUEUE2_ITEM_TYPE_EVENT,
314   GST_QUEUE2_ITEM_TYPE_QUERY
315 } GstQueue2ItemType;
316
317 typedef struct
318 {
319   GstQueue2ItemType type;
320   GstMiniObject *item;
321 } GstQueue2Item;
322
323 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
324
325 static void
326 gst_queue2_class_init (GstQueue2Class * klass)
327 {
328   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
329   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
330
331   gobject_class->set_property = gst_queue2_set_property;
332   gobject_class->get_property = gst_queue2_get_property;
333
334   /* properties */
335   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
336       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
337           "Current amount of data in the queue (bytes)",
338           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
339   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
340       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
341           "Current number of buffers in the queue",
342           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
343   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
344       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
345           "Current amount of data in the queue (in ns)",
346           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
347
348   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
349       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
350           "Max. amount of data in the queue (bytes, 0=disable)",
351           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
352           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
353           G_PARAM_STATIC_STRINGS));
354   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
355       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
356           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
357           DEFAULT_MAX_SIZE_BUFFERS,
358           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
359           G_PARAM_STATIC_STRINGS));
360   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
361       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
362           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
363           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
364           G_PARAM_STATIC_STRINGS));
365
366   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
367       g_param_spec_boolean ("use-buffering", "Use buffering",
368           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
369           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
370           G_PARAM_STATIC_STRINGS));
371   g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
372       g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
373           "Use a bitrate from upstream tags to estimate buffer duration if not provided",
374           DEFAULT_USE_TAGS_BITRATE,
375           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
376           G_PARAM_STATIC_STRINGS));
377   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
378       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
379           "Estimate the bitrate of the stream to calculate time level",
380           DEFAULT_USE_RATE_ESTIMATE,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
383       g_param_spec_int ("low-percent", "Low percent",
384           "Low threshold for buffering to start. Only used if use-buffering is True "
385           "(Deprecated: use low-watermark instead)",
386           0, 100, DEFAULT_LOW_WATERMARK * 100,
387           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
389       g_param_spec_int ("high-percent", "High percent",
390           "High threshold for buffering to finish. Only used if use-buffering is True "
391           "(Deprecated: use high-watermark instead)",
392           0, 100, DEFAULT_HIGH_WATERMARK * 100,
393           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
395       g_param_spec_double ("low-watermark", "Low watermark",
396           "Low threshold for buffering to start. Only used if use-buffering is True",
397           0.0, 1.0, DEFAULT_LOW_WATERMARK,
398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
400       g_param_spec_double ("high-watermark", "High watermark",
401           "High threshold for buffering to finish. Only used if use-buffering is True",
402           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
403           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404
405   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
406       g_param_spec_string ("temp-template", "Temporary File Template",
407           "File template to store temporary files in, should contain directory "
408           "and XXXXXX. (NULL == disabled)",
409           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410
411   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
412       g_param_spec_string ("temp-location", "Temporary File Location",
413           "Location to store temporary files in (Only read this property, "
414           "use temp-template to configure the name template)",
415           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
416
417   /**
418    * GstQueue2:temp-remove
419    *
420    * When temp-template is set, remove the temporary file when going to READY.
421    */
422   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
423       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
424           "Remove the temp-location after use",
425           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
426
427   /**
428    * GstQueue2:ring-buffer-max-size
429    *
430    * The maximum size of the ring buffer in bytes. If set to 0, the ring
431    * buffer is disabled. Default 0.
432    */
433   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
434       g_param_spec_uint64 ("ring-buffer-max-size",
435           "Max. ring buffer size (bytes)",
436           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
437           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
438           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439
440   /**
441    * GstQueue2:avg-in-rate
442    *
443    * The average input data rate.
444    */
445   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
446       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
447           "Average input data rate (bytes/s)",
448           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
449
450   /* set several parent class virtual functions */
451   gobject_class->finalize = gst_queue2_finalize;
452
453   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
454   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
455
456   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
457       "Generic",
458       "Simple data queue",
459       "Erik Walthinsen <omega@cse.ogi.edu>, "
460       "Wim Taymans <wim.taymans@gmail.com>");
461
462   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
463   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
464 }
465
466 static void
467 gst_queue2_init (GstQueue2 * queue)
468 {
469   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
470
471   gst_pad_set_chain_function (queue->sinkpad,
472       GST_DEBUG_FUNCPTR (gst_queue2_chain));
473   gst_pad_set_chain_list_function (queue->sinkpad,
474       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
475   gst_pad_set_activatemode_function (queue->sinkpad,
476       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
477   gst_pad_set_event_full_function (queue->sinkpad,
478       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
479   gst_pad_set_query_function (queue->sinkpad,
480       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
481   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
482   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
483
484   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
485
486   gst_pad_set_activatemode_function (queue->srcpad,
487       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
488   gst_pad_set_getrange_function (queue->srcpad,
489       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
490   gst_pad_set_event_function (queue->srcpad,
491       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
492   gst_pad_set_query_function (queue->srcpad,
493       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
494   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
495   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
496
497   /* levels */
498   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
499   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
500   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
501   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
502   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
503   queue->use_buffering = DEFAULT_USE_BUFFERING;
504   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
505   queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
506   queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
507
508   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
509   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
510
511   queue->sinktime = GST_CLOCK_TIME_NONE;
512   queue->srctime = GST_CLOCK_TIME_NONE;
513   queue->sink_tainted = TRUE;
514   queue->src_tainted = TRUE;
515
516   queue->srcresult = GST_FLOW_FLUSHING;
517   queue->sinkresult = GST_FLOW_FLUSHING;
518   queue->is_eos = FALSE;
519   queue->in_timer = g_timer_new ();
520   queue->out_timer = g_timer_new ();
521
522   g_mutex_init (&queue->qlock);
523   queue->waiting_add = FALSE;
524   g_cond_init (&queue->item_add);
525   queue->waiting_del = FALSE;
526   g_cond_init (&queue->item_del);
527   queue->queue = gst_queue_array_new_for_struct (sizeof (GstQueue2Item), 32);
528
529   g_cond_init (&queue->query_handled);
530   queue->last_query = FALSE;
531
532   g_mutex_init (&queue->buffering_post_lock);
533   queue->buffering_percent = 100;
534   queue->last_posted_buffering_percent = -1;
535
536   /* tempfile related */
537   queue->temp_template = NULL;
538   queue->temp_location = NULL;
539   queue->temp_remove = DEFAULT_TEMP_REMOVE;
540
541   queue->ring_buffer = NULL;
542   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
543
544   GST_DEBUG_OBJECT (queue,
545       "initialized queue's not_empty & not_full conditions");
546 }
547
548 /* called only once, as opposed to dispose */
549 static void
550 gst_queue2_finalize (GObject * object)
551 {
552   GstQueue2 *queue = GST_QUEUE2 (object);
553   GstQueue2Item *qitem;
554
555   GST_DEBUG_OBJECT (queue, "finalizing queue");
556
557   while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
558     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
559       gst_mini_object_unref (qitem->item);
560   }
561   gst_queue_array_free (queue->queue);
562
563   queue->last_query = FALSE;
564   g_mutex_clear (&queue->qlock);
565   g_mutex_clear (&queue->buffering_post_lock);
566   g_cond_clear (&queue->item_add);
567   g_cond_clear (&queue->item_del);
568   g_cond_clear (&queue->query_handled);
569   g_timer_destroy (queue->in_timer);
570   g_timer_destroy (queue->out_timer);
571
572   /* temp_file path cleanup  */
573   g_free (queue->temp_template);
574   g_free (queue->temp_location);
575
576   G_OBJECT_CLASS (parent_class)->finalize (object);
577 }
578
579 static void
580 debug_ranges (GstQueue2 * queue)
581 {
582   GstQueue2Range *walk;
583
584   for (walk = queue->ranges; walk; walk = walk->next) {
585     GST_DEBUG_OBJECT (queue,
586         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
587         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
588         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
589         walk->rb_writing_pos, walk->reading_pos,
590         walk == queue->current ? "**y**" : "  n  ");
591   }
592 }
593
594 /* clear all the downloaded ranges */
595 static void
596 clean_ranges (GstQueue2 * queue)
597 {
598   GST_DEBUG_OBJECT (queue, "clean queue ranges");
599
600   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
601   queue->ranges = NULL;
602   queue->current = NULL;
603 }
604
605 /* find a range that contains @offset or NULL when nothing does */
606 static GstQueue2Range *
607 find_range (GstQueue2 * queue, guint64 offset)
608 {
609   GstQueue2Range *range = NULL;
610   GstQueue2Range *walk;
611
612   /* first do a quick check for the current range */
613   for (walk = queue->ranges; walk; walk = walk->next) {
614     if (offset >= walk->offset && offset <= walk->writing_pos) {
615       /* we can reuse an existing range */
616       range = walk;
617       break;
618     }
619   }
620   if (range) {
621     GST_DEBUG_OBJECT (queue,
622         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
623         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
624   } else {
625     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
626   }
627   return range;
628 }
629
630 static void
631 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
632 {
633   guint64 max_reading_pos, writing_pos;
634
635   writing_pos = range->writing_pos;
636   max_reading_pos = range->max_reading_pos;
637
638   if (writing_pos > max_reading_pos)
639     queue->cur_level.bytes = writing_pos - max_reading_pos;
640   else
641     queue->cur_level.bytes = 0;
642 }
643
644 /* make a new range for @offset or reuse an existing range */
645 static GstQueue2Range *
646 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
647 {
648   GstQueue2Range *range, *prev, *next;
649
650   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
651
652   if ((range = find_range (queue, offset))) {
653     GST_DEBUG_OBJECT (queue,
654         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
655         range->writing_pos);
656     if (update_existing && range->writing_pos != offset) {
657       GST_DEBUG_OBJECT (queue, "updating range writing position to "
658           "%" G_GUINT64_FORMAT, offset);
659       range->writing_pos = offset;
660     }
661   } else {
662     GST_DEBUG_OBJECT (queue,
663         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
664
665     range = g_slice_new0 (GstQueue2Range);
666     range->offset = offset;
667     /* we want to write to the next location in the ring buffer */
668     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
669     range->writing_pos = offset;
670     range->rb_writing_pos = range->rb_offset;
671     range->reading_pos = offset;
672     range->max_reading_pos = offset;
673
674     /* insert sorted */
675     prev = NULL;
676     next = queue->ranges;
677     while (next) {
678       if (next->offset > offset) {
679         /* insert before next */
680         GST_DEBUG_OBJECT (queue,
681             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
682             next->offset);
683         break;
684       }
685       /* try next */
686       prev = next;
687       next = next->next;
688     }
689     range->next = next;
690     if (prev)
691       prev->next = range;
692     else
693       queue->ranges = range;
694   }
695   debug_ranges (queue);
696
697   /* update the stats for this range */
698   update_cur_level (queue, range);
699
700   return range;
701 }
702
703
704 /* clear and init the download ranges for offset 0 */
705 static void
706 init_ranges (GstQueue2 * queue)
707 {
708   GST_DEBUG_OBJECT (queue, "init queue ranges");
709
710   /* get rid of all the current ranges */
711   clean_ranges (queue);
712   /* make a range for offset 0 */
713   queue->current = add_range (queue, 0, TRUE);
714 }
715
716 /* calculate the diff between running time on the sink and src of the queue.
717  * This is the total amount of time in the queue. */
718 static void
719 update_time_level (GstQueue2 * queue)
720 {
721   if (queue->sink_tainted) {
722     queue->sinktime =
723         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
724         queue->sink_segment.position);
725     queue->sink_tainted = FALSE;
726   }
727
728   if (queue->src_tainted) {
729     queue->srctime =
730         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
731         queue->src_segment.position);
732     queue->src_tainted = FALSE;
733   }
734
735   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
736       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
737
738   if (queue->sinktime != GST_CLOCK_TIME_NONE
739       && queue->srctime != GST_CLOCK_TIME_NONE
740       && queue->sinktime >= queue->srctime)
741     queue->cur_level.time = queue->sinktime - queue->srctime;
742   else
743     queue->cur_level.time = 0;
744 }
745
746 /* take a SEGMENT event and apply the values to segment, updating the time
747  * level of queue. */
748 static void
749 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
750     gboolean is_sink)
751 {
752   gst_event_copy_segment (event, segment);
753
754   if (segment->format == GST_FORMAT_BYTES) {
755     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
756       /* start is where we'll be getting from and as such writing next */
757       queue->current = add_range (queue, segment->start, TRUE);
758     }
759   }
760
761   /* now configure the values, we use these to track timestamps on the
762    * sinkpad. */
763   if (segment->format != GST_FORMAT_TIME) {
764     /* non-time format, pretend the current time segment is closed with a
765      * 0 start and unknown stop time. */
766     segment->format = GST_FORMAT_TIME;
767     segment->start = 0;
768     segment->stop = -1;
769     segment->time = 0;
770   }
771
772   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
773
774   if (is_sink)
775     queue->sink_tainted = TRUE;
776   else
777     queue->src_tainted = TRUE;
778
779   /* segment can update the time level of the queue */
780   update_time_level (queue);
781 }
782
783 static void
784 apply_gap (GstQueue2 * queue, GstEvent * event,
785     GstSegment * segment, gboolean is_sink)
786 {
787   GstClockTime timestamp;
788   GstClockTime duration;
789
790   gst_event_parse_gap (event, &timestamp, &duration);
791
792   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
793
794     if (GST_CLOCK_TIME_IS_VALID (duration)) {
795       timestamp += duration;
796     }
797
798     segment->position = timestamp;
799
800     if (is_sink)
801       queue->sink_tainted = TRUE;
802     else
803       queue->src_tainted = TRUE;
804
805     /* calc diff with other end */
806     update_time_level (queue);
807   }
808 }
809
810 /* take a buffer and update segment, updating the time level of the queue. */
811 static void
812 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
813     guint64 size, gboolean is_sink)
814 {
815   GstClockTime duration, timestamp;
816
817   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
818   duration = GST_BUFFER_DURATION (buffer);
819
820   /* If we have no duration, pick one from the bitrate if we can */
821   if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
822     guint bitrate =
823         is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
824     if (bitrate)
825       duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
826   }
827
828   /* if no timestamp is set, assume it's continuous with the previous
829    * time */
830   if (timestamp == GST_CLOCK_TIME_NONE)
831     timestamp = segment->position;
832
833   /* add duration */
834   if (duration != GST_CLOCK_TIME_NONE)
835     timestamp += duration;
836
837   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
838       GST_TIME_ARGS (timestamp));
839
840   segment->position = timestamp;
841
842   if (is_sink)
843     queue->sink_tainted = TRUE;
844   else
845     queue->src_tainted = TRUE;
846
847   /* calc diff with other end */
848   update_time_level (queue);
849 }
850
851 struct BufListData
852 {
853   GstClockTime timestamp;
854   guint bitrate;
855 };
856
857 static gboolean
858 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
859 {
860   struct BufListData *bld = data;
861   GstClockTime *timestamp = &bld->timestamp;
862   GstClockTime btime;
863
864   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
865       " duration %" GST_TIME_FORMAT, idx,
866       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
867       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
868       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
869
870   btime = GST_BUFFER_DTS_OR_PTS (*buf);
871   if (GST_CLOCK_TIME_IS_VALID (btime))
872     *timestamp = btime;
873
874   if (GST_BUFFER_DURATION_IS_VALID (*buf))
875     *timestamp += GST_BUFFER_DURATION (*buf);
876   else if (bld->bitrate != 0) {
877     guint64 size = gst_buffer_get_size (*buf);
878
879     /* If we have no duration, pick one from the bitrate if we can */
880     *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
881   }
882
883
884   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
885   return TRUE;
886 }
887
888 /* take a buffer list and update segment, updating the time level of the queue */
889 static void
890 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
891     GstSegment * segment, gboolean is_sink)
892 {
893   struct BufListData bld;
894
895   /* if no timestamp is set, assume it's continuous with the previous time */
896   bld.timestamp = segment->position;
897
898   if (queue->use_tags_bitrate) {
899     if (is_sink)
900       bld.bitrate = queue->sink_tags_bitrate;
901     else
902       bld.bitrate = queue->src_tags_bitrate;
903   } else
904     bld.bitrate = 0;
905
906   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
907
908   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
909       GST_TIME_ARGS (bld.timestamp));
910
911   segment->position = bld.timestamp;
912
913   if (is_sink)
914     queue->sink_tainted = TRUE;
915   else
916     queue->src_tainted = TRUE;
917
918   /* calc diff with other end */
919   update_time_level (queue);
920 }
921
922 static inline gint
923 normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
924     guint64 alt_max)
925 {
926   guint64 p;
927
928   if (max_level == 0)
929     return 0;
930
931   if (alt_max > 0)
932     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
933         MIN (max_level, alt_max));
934   else
935     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
936
937   return MIN (p, MAX_BUFFERING_LEVEL);
938 }
939
940 static gboolean
941 get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
942     gint * buffering_level)
943 {
944   gint buflevel, buflevel2;
945
946   if (queue->high_watermark <= 0) {
947     if (buffering_level)
948       *buffering_level = MAX_BUFFERING_LEVEL;
949     if (is_buffering)
950       *is_buffering = FALSE;
951     return FALSE;
952   }
953 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
954     normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
955
956   if (queue->is_eos || queue->srcresult == GST_FLOW_NOT_LINKED) {
957     /* on EOS and NOT_LINKED we are always 100% full, we set the var
958      * here so that we can reuse the logic below to stop buffering */
959     buflevel = MAX_BUFFERING_LEVEL;
960     GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
961   } else {
962     GST_LOG_OBJECT (queue,
963         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
964         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
965         queue->cur_level.buffers);
966
967     /* figure out the buffering level we are filled, we take the max of all formats. */
968     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
969       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
970     } else {
971       guint64 rb_size = queue->ring_buffer_max_size;
972       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
973     }
974
975     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
976     buflevel = MAX (buflevel, buflevel2);
977
978     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
979     buflevel = MAX (buflevel, buflevel2);
980
981     /* also apply the rate estimate when we need to */
982     if (queue->use_rate_estimate) {
983       buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
984       buflevel = MAX (buflevel, buflevel2);
985     }
986
987     /* Don't get to 0% unless we're really empty */
988     if (queue->cur_level.bytes > 0)
989       buflevel = MAX (1, buflevel);
990   }
991 #undef GET_BUFFER_LEVEL_FOR_QUANTITY
992
993   if (is_buffering)
994     *is_buffering = queue->is_buffering;
995
996   if (buffering_level)
997     *buffering_level = buflevel;
998
999   GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
1000       buflevel);
1001
1002   return TRUE;
1003 }
1004
1005 static gint
1006 convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
1007 {
1008   int percent;
1009
1010   /* scale so that if buffering_level equals the high watermark,
1011    * the percentage is 100% */
1012   percent = buffering_level * 100 / queue->high_watermark;
1013   /* clip */
1014   if (percent > 100)
1015     percent = 100;
1016
1017   return percent;
1018 }
1019
1020 static void
1021 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
1022     gint * avg_in, gint * avg_out, gint64 * buffering_left)
1023 {
1024   if (mode) {
1025     if (!QUEUE_IS_USING_QUEUE (queue)) {
1026       if (QUEUE_IS_USING_RING_BUFFER (queue))
1027         *mode = GST_BUFFERING_TIMESHIFT;
1028       else
1029         *mode = GST_BUFFERING_DOWNLOAD;
1030     } else {
1031       *mode = GST_BUFFERING_STREAM;
1032     }
1033   }
1034
1035   if (avg_in)
1036     *avg_in = queue->byte_in_rate;
1037   if (avg_out)
1038     *avg_out = queue->byte_out_rate;
1039
1040   if (buffering_left) {
1041     *buffering_left = (percent == 100 ? 0 : -1);
1042
1043     if (queue->use_rate_estimate) {
1044       guint64 max, cur;
1045
1046       max = queue->max_level.rate_time;
1047       cur = queue->cur_level.rate_time;
1048
1049       if (percent != 100 && max > cur)
1050         *buffering_left = (max - cur) / 1000000;
1051     }
1052   }
1053 }
1054
1055 /* Called with the lock taken */
1056 static GstMessage *
1057 gst_queue2_get_buffering_message (GstQueue2 * queue)
1058 {
1059   GstMessage *msg = NULL;
1060   if (queue->percent_changed) {
1061     /* Don't change the buffering level if the sinkpad is waiting for
1062      * space to become available.  This prevents the situation where,
1063      * upstream is pushing buffers larger than our limits so only 1 buffer
1064      * is ever in the queue at a time.
1065      * Changing the level causes a buffering message to be posted saying that
1066      * we are buffering which the application may pause to wait for another
1067      * 100% buffering message which would be posted very soon after the
1068      * waiting sink thread adds it's buffer to the queue */
1069     /* FIXME: This situation above can still occur later if
1070      * the sink pad is waiting to push a serialized event into the queue and
1071      * the queue becomes empty for a short period of time. */
1072     if (!queue->waiting_del
1073         && queue->last_posted_buffering_percent != queue->buffering_percent) {
1074       gint percent = queue->buffering_percent;
1075
1076       GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1077       msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1078
1079       gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1080           queue->avg_out, queue->buffering_left);
1081
1082       queue->last_posted_buffering_percent = percent;
1083     }
1084     queue->percent_changed = FALSE;
1085   }
1086
1087   return msg;
1088 }
1089
1090 static void
1091 gst_queue2_post_buffering (GstQueue2 * queue)
1092 {
1093   GstMessage *msg = NULL;
1094
1095   g_mutex_lock (&queue->buffering_post_lock);
1096   GST_QUEUE2_MUTEX_LOCK (queue);
1097   msg = gst_queue2_get_buffering_message (queue);
1098   GST_QUEUE2_MUTEX_UNLOCK (queue);
1099
1100   if (msg != NULL)
1101     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1102
1103   g_mutex_unlock (&queue->buffering_post_lock);
1104 }
1105
1106 static void
1107 update_buffering (GstQueue2 * queue)
1108 {
1109   gint buffering_level, percent;
1110
1111   /* Ensure the variables used to calculate buffering state are up-to-date. */
1112   if (queue->current)
1113     update_cur_level (queue, queue->current);
1114   update_in_rates (queue, FALSE);
1115
1116   if (!get_buffering_level (queue, NULL, &buffering_level))
1117     return;
1118
1119   percent = convert_to_buffering_percent (queue, buffering_level);
1120
1121   if (queue->is_buffering) {
1122     /* if we were buffering see if we reached the high watermark */
1123     if (percent >= 100)
1124       queue->is_buffering = FALSE;
1125
1126     SET_PERCENT (queue, percent);
1127   } else {
1128     /* we were not buffering, check if we need to start buffering if we drop
1129      * below the low threshold */
1130     if (buffering_level < queue->low_watermark) {
1131       queue->is_buffering = TRUE;
1132       SET_PERCENT (queue, percent);
1133     }
1134   }
1135 }
1136
1137 static void
1138 reset_rate_timer (GstQueue2 * queue)
1139 {
1140   queue->bytes_in = 0;
1141   queue->bytes_out = 0;
1142   queue->byte_in_rate = 0.0;
1143   queue->byte_in_period = 0;
1144   queue->byte_out_rate = 0.0;
1145   queue->last_update_in_rates_elapsed = 0.0;
1146   queue->last_in_elapsed = 0.0;
1147   queue->last_out_elapsed = 0.0;
1148   queue->in_timer_started = FALSE;
1149   queue->out_timer_started = FALSE;
1150 }
1151
1152 /* the interval in seconds to recalculate the rate */
1153 #define RATE_INTERVAL    0.2
1154 /* Tuning for rate estimation. We use a large window for the input rate because
1155  * it should be stable when connected to a network. The output rate is less
1156  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1157  * therefore adapt more quickly.
1158  * However, initial input rate may be subject to a burst, and should therefore
1159  * initially also adapt more quickly to changes, and only later on give higher
1160  * weight to previous values. */
1161 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1162 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1163
1164 static void
1165 update_in_rates (GstQueue2 * queue, gboolean force)
1166 {
1167   gdouble elapsed, period;
1168   gdouble byte_in_rate;
1169
1170   if (!queue->in_timer_started) {
1171     queue->in_timer_started = TRUE;
1172     g_timer_start (queue->in_timer);
1173     return;
1174   }
1175
1176   queue->last_update_in_rates_elapsed = elapsed =
1177       g_timer_elapsed (queue->in_timer, NULL);
1178
1179   /* recalc after each interval. */
1180   if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1181     period = elapsed - queue->last_in_elapsed;
1182
1183     GST_DEBUG_OBJECT (queue,
1184         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1185         period, queue->bytes_in, queue->byte_in_period);
1186
1187     byte_in_rate = queue->bytes_in / period;
1188
1189     if (queue->byte_in_rate == 0.0)
1190       queue->byte_in_rate = byte_in_rate;
1191     else
1192       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1193           (double) queue->byte_in_period, period);
1194
1195     /* another data point, cap at 16 for long time running average */
1196     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1197       queue->byte_in_period += period;
1198
1199     /* reset the values to calculate rate over the next interval */
1200     queue->last_in_elapsed = elapsed;
1201     queue->bytes_in = 0;
1202   }
1203
1204   if (queue->byte_in_rate > 0.0) {
1205     queue->cur_level.rate_time =
1206         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1207   }
1208   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1209       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1210 }
1211
1212 static void
1213 update_out_rates (GstQueue2 * queue)
1214 {
1215   gdouble elapsed, period;
1216   gdouble byte_out_rate;
1217
1218   if (!queue->out_timer_started) {
1219     queue->out_timer_started = TRUE;
1220     g_timer_start (queue->out_timer);
1221     return;
1222   }
1223
1224   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1225
1226   /* recalc after each interval. */
1227   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1228     period = elapsed - queue->last_out_elapsed;
1229
1230     GST_DEBUG_OBJECT (queue,
1231         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1232
1233     byte_out_rate = queue->bytes_out / period;
1234
1235     if (queue->byte_out_rate == 0.0)
1236       queue->byte_out_rate = byte_out_rate;
1237     else
1238       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1239
1240     /* reset the values to calculate rate over the next interval */
1241     queue->last_out_elapsed = elapsed;
1242     queue->bytes_out = 0;
1243   }
1244   if (queue->byte_in_rate > 0.0) {
1245     queue->cur_level.rate_time =
1246         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1247   }
1248   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1249       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1250 }
1251
1252 static void
1253 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1254 {
1255   guint64 reading_pos, max_reading_pos;
1256
1257   reading_pos = pos;
1258   max_reading_pos = range->max_reading_pos;
1259
1260   max_reading_pos = MAX (max_reading_pos, reading_pos);
1261
1262   GST_DEBUG_OBJECT (queue,
1263       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1264       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1265   range->max_reading_pos = max_reading_pos;
1266
1267   update_cur_level (queue, range);
1268 }
1269
1270 static gboolean
1271 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1272 {
1273   GstEvent *event;
1274   gboolean res;
1275
1276   /* until we receive the FLUSH_STOP from this seek, we skip data */
1277   queue->seeking = TRUE;
1278   GST_QUEUE2_MUTEX_UNLOCK (queue);
1279
1280   debug_ranges (queue);
1281
1282   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1283
1284   event =
1285       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1286       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1287       GST_SEEK_TYPE_NONE, -1);
1288
1289   res = gst_pad_push_event (queue->sinkpad, event);
1290   GST_QUEUE2_MUTEX_LOCK (queue);
1291
1292   if (res) {
1293     /* Between us sending the seek event and re-acquiring the lock, the source
1294      * thread might already have pushed data and moved along the range's
1295      * writing_pos beyond the seek offset. In that case we don't want to set
1296      * the writing position back to the requested seek position, as it would
1297      * cause data to be written to the wrong offset in the file or ring buffer.
1298      * We still do the add_range call to switch the current range to the
1299      * requested range, or create one if one doesn't exist yet. */
1300     queue->current = add_range (queue, offset, FALSE);
1301   }
1302
1303   return res;
1304 }
1305
1306 /* get the threshold for when we decide to seek rather than wait */
1307 static guint64
1308 get_seek_threshold (GstQueue2 * queue)
1309 {
1310   guint64 threshold;
1311
1312   /* FIXME, find a good threshold based on the incoming rate. */
1313   threshold = 1024 * 512;
1314
1315   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1316     threshold = MIN (threshold,
1317         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1318   }
1319   return threshold;
1320 }
1321
1322 /* see if there is enough data in the file to read a full buffer */
1323 static gboolean
1324 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1325 {
1326   GstQueue2Range *range;
1327
1328   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1329       offset, length);
1330
1331   if ((range = find_range (queue, offset))) {
1332     if (queue->current != range) {
1333       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1334       perform_seek_to_offset (queue, range->writing_pos);
1335     }
1336
1337     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1338         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1339
1340     /* we have a range for offset */
1341     GST_DEBUG_OBJECT (queue,
1342         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1343         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1344
1345     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1346       return TRUE;
1347
1348     if (offset + length <= range->writing_pos)
1349       return TRUE;
1350     else
1351       GST_DEBUG_OBJECT (queue,
1352           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1353           (offset + length) - range->writing_pos);
1354
1355   } else {
1356     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1357         " len %u", offset, length);
1358     /* we don't have the range, see how far away we are */
1359     if (!queue->is_eos && queue->current) {
1360       guint64 threshold = get_seek_threshold (queue);
1361
1362       if (offset >= queue->current->offset && offset <=
1363           queue->current->writing_pos + threshold) {
1364         GST_INFO_OBJECT (queue,
1365             "requested data is within range, wait for data");
1366         return FALSE;
1367       }
1368     }
1369
1370     /* too far away, do a seek */
1371     perform_seek_to_offset (queue, offset);
1372   }
1373
1374   return FALSE;
1375 }
1376
1377 #ifdef HAVE_FSEEKO
1378 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1379 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1380 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1381 #else
1382 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1383 #endif
1384
1385 static GstFlowReturn
1386 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1387     guint8 * dst, gint64 * read_return)
1388 {
1389   guint8 *ring_buffer;
1390   size_t res;
1391
1392   ring_buffer = queue->ring_buffer;
1393
1394   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1395     goto seek_failed;
1396
1397   /* this should not block */
1398   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1399       length, offset);
1400   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1401     res = fread (dst, 1, length, queue->temp_file);
1402   } else {
1403     memcpy (dst, ring_buffer + offset, length);
1404     res = length;
1405   }
1406
1407   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1408
1409   if (G_UNLIKELY (res < length)) {
1410     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1411       goto could_not_read;
1412     /* check for errors or EOF */
1413     if (ferror (queue->temp_file))
1414       goto could_not_read;
1415     if (feof (queue->temp_file) && length > 0)
1416       goto eos;
1417   }
1418
1419   *read_return = res;
1420
1421   return GST_FLOW_OK;
1422
1423 seek_failed:
1424   {
1425     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1426     return GST_FLOW_ERROR;
1427   }
1428 could_not_read:
1429   {
1430     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1431     return GST_FLOW_ERROR;
1432   }
1433 eos:
1434   {
1435     GST_DEBUG ("non-regular file hits EOS");
1436     return GST_FLOW_EOS;
1437   }
1438 }
1439
1440 static GstFlowReturn
1441 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1442     GstBuffer ** buffer)
1443 {
1444   GstBuffer *buf;
1445   GstMapInfo info;
1446   guint8 *data;
1447   guint64 file_offset;
1448   guint block_length, remaining, read_length;
1449   guint64 rb_size;
1450   guint64 max_size;
1451   guint64 rpos;
1452   GstFlowReturn ret = GST_FLOW_OK;
1453
1454   /* allocate the output buffer of the requested size */
1455   if (*buffer == NULL)
1456     buf = gst_buffer_new_allocate (NULL, length, NULL);
1457   else
1458     buf = *buffer;
1459
1460   if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1461     goto buffer_write_fail;
1462   data = info.data;
1463
1464   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1465       offset);
1466
1467   rpos = offset;
1468   rb_size = queue->ring_buffer_max_size;
1469   max_size = QUEUE_MAX_BYTES (queue);
1470
1471   remaining = length;
1472   while (remaining > 0) {
1473     /* configure how much/whether to read */
1474     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1475       read_length = 0;
1476
1477       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1478         guint64 level;
1479
1480         /* calculate how far away the offset is */
1481         if (queue->current->writing_pos > rpos)
1482           level = queue->current->writing_pos - rpos;
1483         else
1484           level = 0;
1485
1486         GST_DEBUG_OBJECT (queue,
1487             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1488             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1489             rpos, queue->current->writing_pos, level, max_size);
1490
1491         if (level >= max_size) {
1492           /* we don't have the data but if we have a ring buffer that is full, we
1493            * need to read */
1494           GST_DEBUG_OBJECT (queue,
1495               "ring buffer full, reading QUEUE_MAX_BYTES %"
1496               G_GUINT64_FORMAT " bytes", max_size);
1497           read_length = max_size;
1498         } else if (queue->is_eos) {
1499           /* won't get any more data so read any data we have */
1500           if (level) {
1501             GST_DEBUG_OBJECT (queue,
1502                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1503                 level);
1504             read_length = level;
1505             remaining = level;
1506             length = level;
1507           } else
1508             goto hit_eos;
1509         }
1510       }
1511
1512       if (read_length == 0) {
1513         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1514           GST_DEBUG_OBJECT (queue,
1515               "update current position [%" G_GUINT64_FORMAT "-%"
1516               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1517           update_cur_pos (queue, queue->current, rpos);
1518           GST_QUEUE2_SIGNAL_DEL (queue);
1519         }
1520
1521         if (queue->use_buffering)
1522           update_buffering (queue);
1523
1524         GST_DEBUG_OBJECT (queue, "waiting for add");
1525         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1526         continue;
1527       }
1528     } else {
1529       /* we have the requested data so read it */
1530       read_length = remaining;
1531     }
1532
1533     /* set range reading_pos to actual reading position for this read */
1534     queue->current->reading_pos = rpos;
1535
1536     /* configure how much and from where to read */
1537     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1538       file_offset =
1539           (queue->current->rb_offset + (rpos -
1540               queue->current->offset)) % rb_size;
1541       if (file_offset + read_length > rb_size) {
1542         block_length = rb_size - file_offset;
1543       } else {
1544         block_length = read_length;
1545       }
1546     } else {
1547       file_offset = rpos;
1548       block_length = read_length;
1549     }
1550
1551     /* while we still have data to read, we loop */
1552     while (read_length > 0) {
1553       gint64 read_return;
1554
1555       ret =
1556           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1557           data, &read_return);
1558       if (ret != GST_FLOW_OK)
1559         goto read_error;
1560
1561       file_offset += read_return;
1562       if (QUEUE_IS_USING_RING_BUFFER (queue))
1563         file_offset %= rb_size;
1564
1565       data += read_return;
1566       read_length -= read_return;
1567       block_length = read_length;
1568       remaining -= read_return;
1569
1570       rpos = (queue->current->reading_pos += read_return);
1571       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1572     }
1573     GST_QUEUE2_SIGNAL_DEL (queue);
1574     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1575   }
1576
1577   gst_buffer_unmap (buf, &info);
1578   gst_buffer_resize (buf, 0, length);
1579
1580   GST_BUFFER_OFFSET (buf) = offset;
1581   GST_BUFFER_OFFSET_END (buf) = offset + length;
1582
1583   *buffer = buf;
1584
1585   return ret;
1586
1587   /* ERRORS */
1588 hit_eos:
1589   {
1590     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1591     gst_buffer_unmap (buf, &info);
1592     if (*buffer == NULL)
1593       gst_buffer_unref (buf);
1594     return GST_FLOW_EOS;
1595   }
1596 out_flushing:
1597   {
1598     GST_DEBUG_OBJECT (queue, "we are flushing");
1599     gst_buffer_unmap (buf, &info);
1600     if (*buffer == NULL)
1601       gst_buffer_unref (buf);
1602     return GST_FLOW_FLUSHING;
1603   }
1604 read_error:
1605   {
1606     GST_DEBUG_OBJECT (queue, "we have a read error");
1607     gst_buffer_unmap (buf, &info);
1608     if (*buffer == NULL)
1609       gst_buffer_unref (buf);
1610     return ret;
1611   }
1612 buffer_write_fail:
1613   {
1614     GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1615         ("Can't write to buffer"));
1616     if (*buffer == NULL)
1617       gst_buffer_unref (buf);
1618     return GST_FLOW_ERROR;
1619   }
1620 }
1621
1622 /* should be called with QUEUE_LOCK */
1623 static GstMiniObject *
1624 gst_queue2_read_item_from_file (GstQueue2 * queue)
1625 {
1626   GstMiniObject *item;
1627
1628   if (queue->stream_start_event != NULL) {
1629     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1630     queue->stream_start_event = NULL;
1631   } else if (queue->starting_segment != NULL) {
1632     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1633     queue->starting_segment = NULL;
1634   } else {
1635     GstFlowReturn ret;
1636     GstBuffer *buffer = NULL;
1637     guint64 reading_pos;
1638
1639     reading_pos = queue->current->reading_pos;
1640
1641     ret =
1642         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1643         &buffer);
1644
1645     switch (ret) {
1646       case GST_FLOW_OK:
1647         item = GST_MINI_OBJECT_CAST (buffer);
1648         break;
1649       case GST_FLOW_EOS:
1650         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1651         break;
1652       default:
1653         item = NULL;
1654         break;
1655     }
1656   }
1657   return item;
1658 }
1659
1660 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1661  * the temp filename. */
1662 static gboolean
1663 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1664 {
1665   gint fd = -1;
1666   gchar *name = NULL;
1667
1668   if (queue->temp_file)
1669     goto already_opened;
1670
1671   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1672
1673   /* If temp_template was set, allocate a filename and open that file */
1674
1675   /* nothing to do */
1676   if (queue->temp_template == NULL)
1677     goto no_directory;
1678
1679   /* make copy of the template, we don't want to change this */
1680   name = g_strdup (queue->temp_template);
1681
1682 #ifdef __BIONIC__
1683   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1684 #else
1685   fd = g_mkstemp (name);
1686 #endif
1687
1688   if (fd == -1)
1689     goto mkstemp_failed;
1690
1691   /* open the file for update/writing */
1692   queue->temp_file = fdopen (fd, "wb+");
1693   /* error creating file */
1694   if (queue->temp_file == NULL)
1695     goto open_failed;
1696
1697   g_free (queue->temp_location);
1698   queue->temp_location = name;
1699
1700   GST_QUEUE2_MUTEX_UNLOCK (queue);
1701
1702   /* we can't emit the notify with the lock */
1703   g_object_notify (G_OBJECT (queue), "temp-location");
1704
1705   GST_QUEUE2_MUTEX_LOCK (queue);
1706
1707   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1708
1709   return TRUE;
1710
1711   /* ERRORS */
1712 already_opened:
1713   {
1714     GST_DEBUG_OBJECT (queue, "temp file was already open");
1715     return TRUE;
1716   }
1717 no_directory:
1718   {
1719     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1720         (_("No Temp directory specified.")), (NULL));
1721     return FALSE;
1722   }
1723 mkstemp_failed:
1724   {
1725     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1726         (_("Could not create temp file \"%s\"."), queue->temp_template),
1727         GST_ERROR_SYSTEM);
1728     g_free (name);
1729     return FALSE;
1730   }
1731 open_failed:
1732   {
1733     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1734         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1735     g_free (name);
1736     if (fd != -1)
1737       close (fd);
1738     return FALSE;
1739   }
1740 }
1741
1742 static void
1743 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1744 {
1745   /* nothing to do */
1746   if (queue->temp_file == NULL)
1747     return;
1748
1749   GST_DEBUG_OBJECT (queue, "closing temp file");
1750
1751   fflush (queue->temp_file);
1752   fclose (queue->temp_file);
1753
1754   if (queue->temp_remove) {
1755     if (remove (queue->temp_location) < 0) {
1756       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1757           queue->temp_location, g_strerror (errno));
1758     }
1759   }
1760
1761   queue->temp_file = NULL;
1762   clean_ranges (queue);
1763 }
1764
1765 static void
1766 gst_queue2_flush_temp_file (GstQueue2 * queue)
1767 {
1768   if (queue->temp_file == NULL)
1769     return;
1770
1771   GST_DEBUG_OBJECT (queue, "flushing temp file");
1772
1773   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1774 }
1775
1776 static void
1777 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1778 {
1779   if (!QUEUE_IS_USING_QUEUE (queue)) {
1780     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1781       gst_queue2_flush_temp_file (queue);
1782     init_ranges (queue);
1783   } else {
1784     GstQueue2Item *qitem;
1785
1786     while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
1787       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1788           && GST_EVENT_IS_STICKY (qitem->item)
1789           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1790           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1791         gst_pad_store_sticky_event (queue->srcpad,
1792             GST_EVENT_CAST (qitem->item));
1793       }
1794
1795       /* Then lose another reference because we are supposed to destroy that
1796          data when flushing */
1797       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1798         gst_mini_object_unref (qitem->item);
1799     }
1800   }
1801   queue->last_query = FALSE;
1802   g_cond_signal (&queue->query_handled);
1803   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1804   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1805   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1806   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1807   queue->sink_tainted = queue->src_tainted = TRUE;
1808   if (queue->starting_segment != NULL)
1809     gst_event_unref (queue->starting_segment);
1810   queue->starting_segment = NULL;
1811   queue->segment_event_received = FALSE;
1812   gst_event_replace (&queue->stream_start_event, NULL);
1813
1814   /* we deleted a lot of something */
1815   GST_QUEUE2_SIGNAL_DEL (queue);
1816 }
1817
1818 static gboolean
1819 gst_queue2_wait_free_space (GstQueue2 * queue)
1820 {
1821   /* We make space available if we're "full" according to whatever
1822    * the user defined as "full". */
1823   if (gst_queue2_is_filled (queue)) {
1824     gboolean started;
1825
1826     /* pause the timer while we wait. The fact that we are waiting does not mean
1827      * the byterate on the input pad is lower */
1828     if ((started = queue->in_timer_started))
1829       g_timer_stop (queue->in_timer);
1830
1831     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1832         "queue is full, waiting for free space");
1833     do {
1834       /* Wait for space to be available, we could be unlocked because of a flush. */
1835       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1836     }
1837     while (gst_queue2_is_filled (queue));
1838
1839     /* and continue if we were running before */
1840     if (started)
1841       g_timer_continue (queue->in_timer);
1842   }
1843   return TRUE;
1844
1845   /* ERRORS */
1846 out_flushing:
1847   {
1848     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1849     return FALSE;
1850   }
1851 }
1852
1853 static gboolean
1854 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1855 {
1856   GstMapInfo info;
1857   guint8 *data, *ring_buffer;
1858   guint size, rb_size;
1859   guint64 writing_pos, new_writing_pos;
1860   GstQueue2Range *range, *prev, *next;
1861   gboolean do_seek = FALSE;
1862
1863   if (QUEUE_IS_USING_RING_BUFFER (queue))
1864     writing_pos = queue->current->rb_writing_pos;
1865   else
1866     writing_pos = queue->current->writing_pos;
1867   ring_buffer = queue->ring_buffer;
1868   rb_size = queue->ring_buffer_max_size;
1869
1870   if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
1871     goto buffer_read_error;
1872
1873   size = info.size;
1874   data = info.data;
1875
1876   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1877       writing_pos);
1878
1879   /* sanity check */
1880   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1881       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1882     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1883         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1884         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1885   }
1886
1887   while (size > 0) {
1888     guint to_write;
1889
1890     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1891       gint64 space;
1892
1893       /* calculate the space in the ring buffer not used by data from
1894        * the current range */
1895       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1896         /* wait until there is some free space */
1897         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1898       }
1899       /* get the amount of space we have */
1900       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1901
1902       /* calculate if we need to split or if we can write the entire
1903        * buffer now */
1904       to_write = MIN (size, space);
1905
1906       /* the writing position in the ring buffer after writing (part
1907        * or all of) the buffer */
1908       new_writing_pos = (writing_pos + to_write) % rb_size;
1909
1910       prev = NULL;
1911       range = queue->ranges;
1912
1913       /* if we need to overwrite data in the ring buffer, we need to
1914        * update the ranges
1915        *
1916        * warning: this code is complicated and includes some
1917        * simplifications - pen, paper and diagrams for the cases
1918        * recommended! */
1919       while (range) {
1920         guint64 range_data_start, range_data_end;
1921         GstQueue2Range *range_to_destroy = NULL;
1922
1923         if (range == queue->current)
1924           goto next_range;
1925
1926         range_data_start = range->rb_offset;
1927         range_data_end = range->rb_writing_pos;
1928
1929         /* handle the special case where the range has no data in it */
1930         if (range->writing_pos == range->offset) {
1931           if (range != queue->current) {
1932             GST_DEBUG_OBJECT (queue,
1933                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1934                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1935             /* remove range */
1936             range_to_destroy = range;
1937             if (prev)
1938               prev->next = range->next;
1939           }
1940           goto next_range;
1941         }
1942
1943         if (range_data_end > range_data_start) {
1944           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1945             goto next_range;
1946
1947           if (new_writing_pos > range_data_start) {
1948             if (new_writing_pos >= range_data_end) {
1949               GST_DEBUG_OBJECT (queue,
1950                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1951                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1952               /* remove range */
1953               range_to_destroy = range;
1954               if (prev)
1955                 prev->next = range->next;
1956             } else {
1957               GST_DEBUG_OBJECT (queue,
1958                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1959                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1960                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1961                   range->offset + new_writing_pos - range_data_start,
1962                   new_writing_pos);
1963               range->offset += (new_writing_pos - range_data_start);
1964               range->rb_offset = new_writing_pos;
1965             }
1966           }
1967         } else {
1968           guint64 new_wpos_virt = writing_pos + to_write;
1969
1970           if (new_wpos_virt <= range_data_start)
1971             goto next_range;
1972
1973           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1974             GST_DEBUG_OBJECT (queue,
1975                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1976                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1977             /* remove range */
1978             range_to_destroy = range;
1979             if (prev)
1980               prev->next = range->next;
1981           } else {
1982             GST_DEBUG_OBJECT (queue,
1983                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1984                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1985                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1986                 range->offset + new_writing_pos - range_data_start,
1987                 new_writing_pos);
1988             range->offset += (new_wpos_virt - range_data_start);
1989             range->rb_offset = new_writing_pos;
1990           }
1991         }
1992
1993       next_range:
1994         if (!range_to_destroy)
1995           prev = range;
1996
1997         range = range->next;
1998         if (range_to_destroy) {
1999           if (range_to_destroy == queue->ranges)
2000             queue->ranges = range;
2001           g_slice_free (GstQueue2Range, range_to_destroy);
2002           range_to_destroy = NULL;
2003         }
2004       }
2005     } else {
2006       to_write = size;
2007       new_writing_pos = writing_pos + to_write;
2008     }
2009
2010     if (QUEUE_IS_USING_TEMP_FILE (queue)
2011         && FSEEK_FILE (queue->temp_file, writing_pos))
2012       goto seek_failed;
2013
2014     if (new_writing_pos > writing_pos) {
2015       GST_INFO_OBJECT (queue,
2016           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
2017           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
2018           queue->current->writing_pos, queue->current->rb_writing_pos);
2019       /* either not using ring buffer or no wrapping, just write */
2020       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2021         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
2022           goto handle_error;
2023       } else {
2024         memcpy (ring_buffer + writing_pos, data, to_write);
2025       }
2026
2027       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
2028         /* try to merge with next range */
2029         while ((next = queue->current->next)) {
2030           GST_INFO_OBJECT (queue,
2031               "checking merge with next range %" G_GUINT64_FORMAT " < %"
2032               G_GUINT64_FORMAT, new_writing_pos, next->offset);
2033           if (new_writing_pos < next->offset)
2034             break;
2035
2036           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
2037               next->writing_pos);
2038
2039           /* remove the group */
2040           queue->current->next = next->next;
2041
2042           /* We use the threshold to decide if we want to do a seek or simply
2043            * read the data again. If there is not so much data in the range we
2044            * prefer to avoid to seek and read it again. */
2045           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
2046             /* the new range had more data than the threshold, it's worth keeping
2047              * it and doing a seek. */
2048             new_writing_pos = next->writing_pos;
2049             do_seek = TRUE;
2050           }
2051           g_slice_free (GstQueue2Range, next);
2052         }
2053         goto update_and_signal;
2054       }
2055     } else {
2056       /* wrapping */
2057       guint block_one, block_two;
2058
2059       block_one = rb_size - writing_pos;
2060       block_two = to_write - block_one;
2061
2062       if (block_one > 0) {
2063         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2064         /* write data to end of ring buffer */
2065         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2066           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2067             goto handle_error;
2068         } else {
2069           memcpy (ring_buffer + writing_pos, data, block_one);
2070         }
2071       }
2072
2073       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2074         goto seek_failed;
2075
2076       if (block_two > 0) {
2077         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2078         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2079           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2080             goto handle_error;
2081         } else {
2082           memcpy (ring_buffer, data + block_one, block_two);
2083         }
2084       }
2085     }
2086
2087   update_and_signal:
2088     /* update the writing positions */
2089     size -= to_write;
2090     GST_INFO_OBJECT (queue,
2091         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2092         to_write, writing_pos, size);
2093
2094     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2095       data += to_write;
2096       queue->current->writing_pos += to_write;
2097       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2098     } else {
2099       queue->current->writing_pos = writing_pos = new_writing_pos;
2100     }
2101     if (do_seek)
2102       perform_seek_to_offset (queue, new_writing_pos);
2103
2104     update_cur_level (queue, queue->current);
2105
2106     /* update the buffering status */
2107     if (queue->use_buffering) {
2108       GstMessage *msg;
2109       update_buffering (queue);
2110       msg = gst_queue2_get_buffering_message (queue);
2111       if (msg) {
2112         GST_QUEUE2_MUTEX_UNLOCK (queue);
2113         g_mutex_lock (&queue->buffering_post_lock);
2114         gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2115         g_mutex_unlock (&queue->buffering_post_lock);
2116         GST_QUEUE2_MUTEX_LOCK (queue);
2117       }
2118     }
2119
2120     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2121         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2122
2123     GST_QUEUE2_SIGNAL_ADD (queue);
2124   }
2125
2126   gst_buffer_unmap (buffer, &info);
2127
2128   return TRUE;
2129
2130   /* ERRORS */
2131 out_flushing:
2132   {
2133     GST_DEBUG_OBJECT (queue, "we are flushing");
2134     gst_buffer_unmap (buffer, &info);
2135     /* FIXME - GST_FLOW_EOS ? */
2136     return FALSE;
2137   }
2138 seek_failed:
2139   {
2140     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2141     gst_buffer_unmap (buffer, &info);
2142     return FALSE;
2143   }
2144 handle_error:
2145   {
2146     switch (errno) {
2147       case ENOSPC:{
2148         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2149         break;
2150       }
2151       default:{
2152         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2153             (_("Error while writing to download file.")),
2154             ("%s", g_strerror (errno)));
2155       }
2156     }
2157     gst_buffer_unmap (buffer, &info);
2158     return FALSE;
2159   }
2160 buffer_read_error:
2161   {
2162     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2163         ("Can't read from buffer"));
2164     return FALSE;
2165   }
2166 }
2167
2168 static gboolean
2169 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2170 {
2171   GstQueue2 *queue = q;
2172
2173   GST_TRACE_OBJECT (queue,
2174       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2175       gst_buffer_get_size (*buf));
2176
2177   if (!gst_queue2_create_write (queue, *buf)) {
2178     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2179     return FALSE;
2180   }
2181   return TRUE;
2182 }
2183
2184 /* enqueue an item an update the level stats */
2185 static void
2186 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2187     GstQueue2ItemType item_type)
2188 {
2189   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2190     GstBuffer *buffer;
2191     guint size;
2192
2193     buffer = GST_BUFFER_CAST (item);
2194     size = gst_buffer_get_size (buffer);
2195
2196     /* add buffer to the statistics */
2197     if (QUEUE_IS_USING_QUEUE (queue)) {
2198       queue->cur_level.buffers++;
2199       queue->cur_level.bytes += size;
2200     }
2201     queue->bytes_in += size;
2202
2203     /* apply new buffer to segment stats */
2204     apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2205     /* update the byterate stats */
2206     update_in_rates (queue, FALSE);
2207
2208     if (!QUEUE_IS_USING_QUEUE (queue)) {
2209       /* FIXME - check return value? */
2210       gst_queue2_create_write (queue, buffer);
2211     }
2212   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2213     GstBufferList *buffer_list;
2214     guint size;
2215
2216     buffer_list = GST_BUFFER_LIST_CAST (item);
2217
2218     size = gst_buffer_list_calculate_size (buffer_list);
2219     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2220
2221     /* add buffer to the statistics */
2222     if (QUEUE_IS_USING_QUEUE (queue)) {
2223       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2224       queue->cur_level.bytes += size;
2225     }
2226     queue->bytes_in += size;
2227
2228     /* apply new buffer to segment stats */
2229     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2230
2231     /* update the byterate stats */
2232     update_in_rates (queue, FALSE);
2233
2234     if (!QUEUE_IS_USING_QUEUE (queue)) {
2235       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2236     }
2237   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2238     GstEvent *event;
2239
2240     event = GST_EVENT_CAST (item);
2241
2242     switch (GST_EVENT_TYPE (event)) {
2243       case GST_EVENT_EOS:
2244         /* Zero the thresholds, this makes sure the queue is completely
2245          * filled and we can read all data from the queue. */
2246         GST_DEBUG_OBJECT (queue, "we have EOS");
2247         queue->is_eos = TRUE;
2248         /* Force updating the input bitrate */
2249         update_in_rates (queue, TRUE);
2250         break;
2251       case GST_EVENT_SEGMENT:
2252         apply_segment (queue, event, &queue->sink_segment, TRUE);
2253         /* This is our first new segment, we hold it
2254          * as we can't save it on the temp file */
2255         if (!QUEUE_IS_USING_QUEUE (queue)) {
2256           if (queue->segment_event_received)
2257             goto unexpected_event;
2258
2259           queue->segment_event_received = TRUE;
2260           if (queue->starting_segment != NULL)
2261             gst_event_unref (queue->starting_segment);
2262           queue->starting_segment = event;
2263           item = NULL;
2264         }
2265         /* a new segment allows us to accept more buffers if we got EOS
2266          * from downstream */
2267         queue->unexpected = FALSE;
2268         break;
2269       case GST_EVENT_GAP:
2270         apply_gap (queue, event, &queue->sink_segment, TRUE);
2271         break;
2272       case GST_EVENT_STREAM_START:
2273         if (!QUEUE_IS_USING_QUEUE (queue)) {
2274           gst_event_replace (&queue->stream_start_event, event);
2275           gst_event_unref (event);
2276           item = NULL;
2277         }
2278         break;
2279       case GST_EVENT_CAPS:{
2280         GstCaps *caps;
2281
2282         gst_event_parse_caps (event, &caps);
2283         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2284
2285         if (!QUEUE_IS_USING_QUEUE (queue)) {
2286           GST_LOG ("Dropping caps event, not using queue");
2287           gst_event_unref (event);
2288           item = NULL;
2289         }
2290         break;
2291       }
2292       default:
2293         if (!QUEUE_IS_USING_QUEUE (queue))
2294           goto unexpected_event;
2295         break;
2296     }
2297   } else if (GST_IS_QUERY (item)) {
2298     /* Can't happen as we check that in the caller */
2299     if (!QUEUE_IS_USING_QUEUE (queue))
2300       g_assert_not_reached ();
2301   } else {
2302     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2303         item, GST_OBJECT_NAME (queue));
2304     /* we can't really unref since we don't know what it is */
2305     item = NULL;
2306   }
2307
2308   if (item) {
2309     /* update the buffering status */
2310     if (queue->use_buffering)
2311       update_buffering (queue);
2312
2313     if (QUEUE_IS_USING_QUEUE (queue)) {
2314       GstQueue2Item qitem;
2315
2316       qitem.type = item_type;
2317       qitem.item = item;
2318       gst_queue_array_push_tail_struct (queue->queue, &qitem);
2319     } else {
2320       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2321     }
2322
2323     GST_QUEUE2_SIGNAL_ADD (queue);
2324   }
2325
2326   return;
2327
2328   /* ERRORS */
2329 unexpected_event:
2330   {
2331     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2332
2333     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2334         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2335         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2336     gst_event_unref (GST_EVENT_CAST (item));
2337     return;
2338   }
2339 }
2340
2341 /* dequeue an item from the queue and update level stats */
2342 static GstMiniObject *
2343 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2344 {
2345   GstMiniObject *item;
2346
2347   if (!QUEUE_IS_USING_QUEUE (queue)) {
2348     item = gst_queue2_read_item_from_file (queue);
2349   } else {
2350     GstQueue2Item *qitem = gst_queue_array_pop_head_struct (queue->queue);
2351
2352     if (qitem == NULL)
2353       goto no_item;
2354
2355     item = qitem->item;
2356   }
2357
2358   if (item == NULL)
2359     goto no_item;
2360
2361   if (GST_IS_BUFFER (item)) {
2362     GstBuffer *buffer;
2363     guint size;
2364
2365     buffer = GST_BUFFER_CAST (item);
2366     size = gst_buffer_get_size (buffer);
2367     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2368
2369     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2370         "retrieved buffer %p from queue", buffer);
2371
2372     if (QUEUE_IS_USING_QUEUE (queue)) {
2373       queue->cur_level.buffers--;
2374       queue->cur_level.bytes -= size;
2375     }
2376     queue->bytes_out += size;
2377
2378     apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2379     /* update the byterate stats */
2380     update_out_rates (queue);
2381     /* update the buffering */
2382     if (queue->use_buffering)
2383       update_buffering (queue);
2384
2385   } else if (GST_IS_EVENT (item)) {
2386     GstEvent *event = GST_EVENT_CAST (item);
2387
2388     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2389
2390     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2391         "retrieved event %p from queue", event);
2392
2393     switch (GST_EVENT_TYPE (event)) {
2394       case GST_EVENT_EOS:
2395         /* queue is empty now that we dequeued the EOS */
2396         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2397         break;
2398       case GST_EVENT_SEGMENT:
2399         apply_segment (queue, event, &queue->src_segment, FALSE);
2400         break;
2401       case GST_EVENT_GAP:
2402         apply_gap (queue, event, &queue->src_segment, FALSE);
2403         break;
2404       default:
2405         break;
2406     }
2407   } else if (GST_IS_BUFFER_LIST (item)) {
2408     GstBufferList *buffer_list;
2409     guint size;
2410
2411     buffer_list = GST_BUFFER_LIST_CAST (item);
2412     size = gst_buffer_list_calculate_size (buffer_list);
2413     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2414
2415     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2416         "retrieved buffer list %p from queue", buffer_list);
2417
2418     if (QUEUE_IS_USING_QUEUE (queue)) {
2419       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2420       queue->cur_level.bytes -= size;
2421     }
2422     queue->bytes_out += size;
2423
2424     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2425     /* update the byterate stats */
2426     update_out_rates (queue);
2427     /* update the buffering */
2428     if (queue->use_buffering)
2429       update_buffering (queue);
2430   } else if (GST_IS_QUERY (item)) {
2431     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2432         "retrieved query %p from queue", item);
2433     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2434   } else {
2435     g_warning
2436         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2437         item, GST_OBJECT_NAME (queue));
2438     item = NULL;
2439     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2440   }
2441   GST_QUEUE2_SIGNAL_DEL (queue);
2442
2443   return item;
2444
2445   /* ERRORS */
2446 no_item:
2447   {
2448     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2449     return NULL;
2450   }
2451 }
2452
2453 static GstFlowReturn
2454 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2455     GstEvent * event)
2456 {
2457   gboolean ret = TRUE;
2458   GstQueue2 *queue;
2459
2460   queue = GST_QUEUE2 (parent);
2461
2462   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
2463       GST_EVENT_TYPE_NAME (event));
2464
2465   switch (GST_EVENT_TYPE (event)) {
2466     case GST_EVENT_FLUSH_START:
2467     {
2468       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2469         /* forward event */
2470         ret = gst_pad_push_event (queue->srcpad, event);
2471
2472         /* now unblock the chain function */
2473         GST_QUEUE2_MUTEX_LOCK (queue);
2474         queue->srcresult = GST_FLOW_FLUSHING;
2475         queue->sinkresult = GST_FLOW_FLUSHING;
2476         /* unblock the loop and chain functions */
2477         GST_QUEUE2_SIGNAL_ADD (queue);
2478         GST_QUEUE2_SIGNAL_DEL (queue);
2479         GST_QUEUE2_MUTEX_UNLOCK (queue);
2480
2481         /* make sure it pauses, this should happen since we sent
2482          * flush_start downstream. */
2483         gst_pad_pause_task (queue->srcpad);
2484         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2485
2486         GST_QUEUE2_MUTEX_LOCK (queue);
2487         queue->last_query = FALSE;
2488         g_cond_signal (&queue->query_handled);
2489         GST_QUEUE2_MUTEX_UNLOCK (queue);
2490       } else {
2491         GST_QUEUE2_MUTEX_LOCK (queue);
2492         /* flush the sink pad */
2493         queue->sinkresult = GST_FLOW_FLUSHING;
2494         GST_QUEUE2_SIGNAL_DEL (queue);
2495         queue->last_query = FALSE;
2496         g_cond_signal (&queue->query_handled);
2497         GST_QUEUE2_MUTEX_UNLOCK (queue);
2498
2499         gst_event_unref (event);
2500       }
2501       break;
2502     }
2503     case GST_EVENT_FLUSH_STOP:
2504     {
2505       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2506         /* forward event */
2507         ret = gst_pad_push_event (queue->srcpad, event);
2508
2509         GST_QUEUE2_MUTEX_LOCK (queue);
2510         gst_queue2_locked_flush (queue, FALSE, TRUE);
2511         queue->srcresult = GST_FLOW_OK;
2512         queue->sinkresult = GST_FLOW_OK;
2513         queue->is_eos = FALSE;
2514         queue->unexpected = FALSE;
2515         queue->seeking = FALSE;
2516         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2517         /* reset rate counters */
2518         reset_rate_timer (queue);
2519         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2520             queue->srcpad, NULL);
2521         GST_QUEUE2_MUTEX_UNLOCK (queue);
2522       } else {
2523         GST_QUEUE2_MUTEX_LOCK (queue);
2524         queue->segment_event_received = FALSE;
2525         queue->is_eos = FALSE;
2526         queue->unexpected = FALSE;
2527         queue->sinkresult = GST_FLOW_OK;
2528         queue->seeking = FALSE;
2529         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2530         GST_QUEUE2_MUTEX_UNLOCK (queue);
2531
2532         gst_event_unref (event);
2533       }
2534       break;
2535     }
2536     case GST_EVENT_TAG:{
2537       if (queue->use_tags_bitrate) {
2538         GstTagList *tags;
2539         guint bitrate;
2540
2541         gst_event_parse_tag (event, &tags);
2542         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2543             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2544           GST_QUEUE2_MUTEX_LOCK (queue);
2545           queue->sink_tags_bitrate = bitrate;
2546           GST_QUEUE2_MUTEX_UNLOCK (queue);
2547           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2548         }
2549       }
2550       /* Fall-through */
2551     }
2552     default:
2553       if (GST_EVENT_IS_SERIALIZED (event)) {
2554         /* serialized events go in the queue */
2555
2556         /* STREAM_START and SEGMENT reset the EOS status of a
2557          * pad. Change the cached sinkpad flow result accordingly */
2558         if (queue->sinkresult == GST_FLOW_EOS
2559             && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
2560                 || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
2561           queue->sinkresult = GST_FLOW_OK;
2562
2563         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2564         if (queue->srcresult != GST_FLOW_OK) {
2565           /* Errors in sticky event pushing are no problem and ignored here
2566            * as they will cause more meaningful errors during data flow.
2567            * For EOS events, that are not followed by data flow, we still
2568            * return FALSE here though and report an error.
2569            */
2570           if (!GST_EVENT_IS_STICKY (event)) {
2571             goto out_flow_error;
2572           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2573             if (queue->srcresult == GST_FLOW_NOT_LINKED
2574                 || queue->srcresult < GST_FLOW_EOS) {
2575               GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
2576             }
2577             goto out_flow_error;
2578           }
2579         }
2580
2581         /* refuse more events on EOS unless they unset the EOS status */
2582         if (queue->is_eos) {
2583           switch (GST_EVENT_TYPE (event)) {
2584             case GST_EVENT_STREAM_START:
2585             case GST_EVENT_SEGMENT:
2586               /* Restart the loop */
2587               if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2588                 queue->srcresult = GST_FLOW_OK;
2589                 queue->is_eos = FALSE;
2590                 queue->unexpected = FALSE;
2591                 queue->seeking = FALSE;
2592                 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2593                 /* reset rate counters */
2594                 reset_rate_timer (queue);
2595                 gst_pad_start_task (queue->srcpad,
2596                     (GstTaskFunction) gst_queue2_loop, queue->srcpad, NULL);
2597               } else {
2598                 queue->is_eos = FALSE;
2599                 queue->unexpected = FALSE;
2600                 queue->seeking = FALSE;
2601                 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2602               }
2603
2604               break;
2605             default:
2606               goto out_eos;
2607           }
2608         }
2609
2610         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2611         GST_QUEUE2_MUTEX_UNLOCK (queue);
2612         gst_queue2_post_buffering (queue);
2613       } else {
2614         /* non-serialized events are passed downstream. */
2615         ret = gst_pad_push_event (queue->srcpad, event);
2616       }
2617       break;
2618   }
2619   if (ret == FALSE)
2620     return GST_FLOW_ERROR;
2621   return GST_FLOW_OK;
2622
2623   /* ERRORS */
2624 out_flushing:
2625   {
2626     GstFlowReturn ret = queue->sinkresult;
2627     GST_DEBUG_OBJECT (queue, "refusing event, we are %s",
2628         gst_flow_get_name (ret));
2629     GST_QUEUE2_MUTEX_UNLOCK (queue);
2630     gst_event_unref (event);
2631     return ret;
2632   }
2633 out_eos:
2634   {
2635     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2636     GST_QUEUE2_MUTEX_UNLOCK (queue);
2637     gst_event_unref (event);
2638     return GST_FLOW_EOS;
2639   }
2640 out_flow_error:
2641   {
2642     GST_LOG_OBJECT (queue,
2643         "refusing event, we have a downstream flow error: %s",
2644         gst_flow_get_name (queue->srcresult));
2645     GST_QUEUE2_MUTEX_UNLOCK (queue);
2646     gst_event_unref (event);
2647     return queue->srcresult;
2648   }
2649 }
2650
2651 static gboolean
2652 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2653     GstQuery * query)
2654 {
2655   GstQueue2 *queue;
2656   gboolean res;
2657
2658   queue = GST_QUEUE2 (parent);
2659
2660   switch (GST_QUERY_TYPE (query)) {
2661     default:
2662       if (GST_QUERY_IS_SERIALIZED (query)) {
2663         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2664         /* serialized events go in the queue. We need to be certain that we
2665          * don't cause deadlocks waiting for the query return value. We check if
2666          * the queue is empty (nothing is blocking downstream and the query can
2667          * be pushed for sure) or we are not buffering. If we are buffering,
2668          * the pipeline waits to unblock downstream until our queue fills up
2669          * completely, which can not happen if we block on the query..
2670          * Therefore we only potentially block when we are not buffering. */
2671         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2672         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2673                 || !queue->use_buffering)) {
2674           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2675             gst_queue2_locked_enqueue (queue, query,
2676                 GST_QUEUE2_ITEM_TYPE_QUERY);
2677
2678             STATUS (queue, queue->sinkpad, "wait for QUERY");
2679             while (queue->sinkresult == GST_FLOW_OK &&
2680                 queue->last_handled_query != query)
2681               g_cond_wait (&queue->query_handled, &queue->qlock);
2682             queue->last_handled_query = NULL;
2683             if (queue->sinkresult != GST_FLOW_OK)
2684               goto out_flushing;
2685             res = queue->last_query;
2686           } else {
2687             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2688             res = FALSE;
2689           }
2690         } else {
2691           GST_DEBUG_OBJECT (queue,
2692               "refusing query, we are not using the queue");
2693           res = FALSE;
2694         }
2695         GST_QUEUE2_MUTEX_UNLOCK (queue);
2696         gst_queue2_post_buffering (queue);
2697       } else {
2698         res = gst_pad_query_default (pad, parent, query);
2699       }
2700       break;
2701   }
2702   return res;
2703
2704   /* ERRORS */
2705 out_flushing:
2706   {
2707     GST_DEBUG_OBJECT (queue, "refusing query, we are %s",
2708         gst_flow_get_name (queue->sinkresult));
2709     GST_QUEUE2_MUTEX_UNLOCK (queue);
2710     return FALSE;
2711   }
2712 }
2713
2714 static gboolean
2715 gst_queue2_is_empty (GstQueue2 * queue)
2716 {
2717   /* never empty on EOS */
2718   if (queue->is_eos)
2719     return FALSE;
2720
2721   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2722     return queue->current->writing_pos <= queue->current->max_reading_pos;
2723   } else {
2724     if (gst_queue_array_get_length (queue->queue) == 0)
2725       return TRUE;
2726   }
2727
2728   return FALSE;
2729 }
2730
2731 static gboolean
2732 gst_queue2_is_filled (GstQueue2 * queue)
2733 {
2734   gboolean res;
2735
2736   /* always filled on EOS */
2737   if (queue->is_eos)
2738     return TRUE;
2739
2740 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2741     (queue->cur_level.format) >= ((alt_max) ? \
2742       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2743
2744   /* if using a ring buffer we're filled if all ring buffer space is used
2745    * _by the current range_ */
2746   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2747     guint64 rb_size = queue->ring_buffer_max_size;
2748     GST_DEBUG_OBJECT (queue,
2749         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2750         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2751     return CHECK_FILLED (bytes, rb_size);
2752   }
2753
2754   /* if using file, we're never filled if we don't have EOS */
2755   if (QUEUE_IS_USING_TEMP_FILE (queue))
2756     return FALSE;
2757
2758   /* we are never filled when we have no buffers at all */
2759   if (queue->cur_level.buffers == 0)
2760     return FALSE;
2761
2762   /* we are filled if one of the current levels exceeds the max */
2763   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2764       || CHECK_FILLED (time, 0);
2765
2766   /* if we need to, use the rate estimate to check against the max time we are
2767    * allowed to queue */
2768   if (queue->use_rate_estimate)
2769     res |= CHECK_FILLED (rate_time, 0);
2770
2771 #undef CHECK_FILLED
2772   return res;
2773 }
2774
2775 static GstFlowReturn
2776 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2777     GstMiniObject * item, GstQueue2ItemType item_type)
2778 {
2779   /* we have to lock the queue since we span threads */
2780   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2781   /* when we received EOS, we refuse more data */
2782   if (queue->is_eos)
2783     goto out_eos;
2784   /* when we received unexpected from downstream, refuse more buffers */
2785   if (queue->unexpected)
2786     goto out_unexpected;
2787
2788   /* while we didn't receive the newsegment, we're seeking and we skip data */
2789   if (queue->seeking)
2790     goto out_seeking;
2791
2792   if (!gst_queue2_wait_free_space (queue))
2793     goto out_flushing;
2794
2795   /* put buffer in queue now */
2796   gst_queue2_locked_enqueue (queue, item, item_type);
2797   GST_QUEUE2_MUTEX_UNLOCK (queue);
2798   gst_queue2_post_buffering (queue);
2799
2800   return GST_FLOW_OK;
2801
2802   /* special conditions */
2803 out_flushing:
2804   {
2805     GstFlowReturn ret = queue->sinkresult;
2806
2807     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2808         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2809     GST_QUEUE2_MUTEX_UNLOCK (queue);
2810     gst_mini_object_unref (item);
2811
2812     return ret;
2813   }
2814 out_eos:
2815   {
2816     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2817     GST_QUEUE2_MUTEX_UNLOCK (queue);
2818     gst_mini_object_unref (item);
2819
2820     return GST_FLOW_EOS;
2821   }
2822 out_seeking:
2823   {
2824     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2825     GST_QUEUE2_MUTEX_UNLOCK (queue);
2826     gst_mini_object_unref (item);
2827
2828     return GST_FLOW_OK;
2829   }
2830 out_unexpected:
2831   {
2832     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2833     GST_QUEUE2_MUTEX_UNLOCK (queue);
2834     gst_mini_object_unref (item);
2835
2836     return GST_FLOW_EOS;
2837   }
2838 }
2839
2840 static GstFlowReturn
2841 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2842 {
2843   GstQueue2 *queue;
2844
2845   queue = GST_QUEUE2 (parent);
2846
2847   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2848       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2849       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2850       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2851       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2852
2853   return gst_queue2_chain_buffer_or_buffer_list (queue,
2854       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2855 }
2856
2857 static GstFlowReturn
2858 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2859     GstBufferList * buffer_list)
2860 {
2861   GstQueue2 *queue;
2862
2863   queue = GST_QUEUE2 (parent);
2864
2865   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2866       "received buffer list %p", buffer_list);
2867
2868   return gst_queue2_chain_buffer_or_buffer_list (queue,
2869       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2870 }
2871
2872 static GstMiniObject *
2873 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2874 {
2875   GstMiniObject *data;
2876
2877   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2878
2879   /* stop pushing buffers, we dequeue all items until we see an item that we
2880    * can push again, which is EOS or SEGMENT. If there is nothing in the
2881    * queue we can push, we set a flag to make the sinkpad refuse more
2882    * buffers with an EOS return value until we receive something
2883    * pushable again or we get flushed. */
2884   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2885     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2886       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2887           "dropping EOS buffer %p", data);
2888       gst_buffer_unref (GST_BUFFER_CAST (data));
2889     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2890       GstEvent *event = GST_EVENT_CAST (data);
2891       GstEventType type = GST_EVENT_TYPE (event);
2892
2893       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
2894           || type == GST_EVENT_STREAM_START) {
2895         /* we found a pushable item in the queue, push it out */
2896         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2897             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2898         return data;
2899       }
2900       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2901           "dropping EOS event %p", event);
2902       gst_event_unref (event);
2903     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2904       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2905           "dropping EOS buffer list %p", data);
2906       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2907     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2908       queue->last_query = FALSE;
2909       g_cond_signal (&queue->query_handled);
2910       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2911     }
2912   }
2913   /* no more items in the queue. Set the unexpected flag so that upstream
2914    * make us refuse any more buffers on the sinkpad. Since we will still
2915    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2916    * task function does not shut down. */
2917   queue->unexpected = TRUE;
2918   return NULL;
2919 }
2920
2921 /* dequeue an item from the queue an push it downstream. This functions returns
2922  * the result of the push. */
2923 static GstFlowReturn
2924 gst_queue2_push_one (GstQueue2 * queue)
2925 {
2926   GstFlowReturn result;
2927   GstMiniObject *data;
2928   GstQueue2ItemType item_type;
2929
2930   data = gst_queue2_locked_dequeue (queue, &item_type);
2931   if (data == NULL)
2932     goto no_item;
2933
2934 next:
2935   result = queue->srcresult;
2936   STATUS (queue, queue->srcpad, "We have something dequeud");
2937   g_atomic_int_set (&queue->downstream_may_block,
2938       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2939       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2940   GST_QUEUE2_MUTEX_UNLOCK (queue);
2941   gst_queue2_post_buffering (queue);
2942
2943   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2944     GstBuffer *buffer;
2945
2946     buffer = GST_BUFFER_CAST (data);
2947
2948     result = gst_pad_push (queue->srcpad, buffer);
2949     g_atomic_int_set (&queue->downstream_may_block, 0);
2950
2951     /* need to check for srcresult here as well */
2952     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2953     if (result == GST_FLOW_EOS) {
2954       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2955       if (data != NULL)
2956         goto next;
2957       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2958        * to the caller so that the task function does not shut down */
2959       result = GST_FLOW_OK;
2960     }
2961   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2962     GstEvent *event = GST_EVENT_CAST (data);
2963     GstEventType type = GST_EVENT_TYPE (event);
2964
2965     if (type == GST_EVENT_TAG) {
2966       if (queue->use_tags_bitrate) {
2967         GstTagList *tags;
2968         guint bitrate;
2969
2970         gst_event_parse_tag (event, &tags);
2971         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2972             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2973           GST_QUEUE2_MUTEX_LOCK (queue);
2974           queue->src_tags_bitrate = bitrate;
2975           GST_QUEUE2_MUTEX_UNLOCK (queue);
2976           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
2977         }
2978       }
2979     }
2980
2981     gst_pad_push_event (queue->srcpad, event);
2982
2983     /* if we're EOS, return EOS so that the task pauses. */
2984     if (type == GST_EVENT_EOS) {
2985       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2986           "pushed EOS event %p, return EOS", event);
2987       result = GST_FLOW_EOS;
2988     }
2989
2990     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2991   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2992     GstBufferList *buffer_list;
2993
2994     buffer_list = GST_BUFFER_LIST_CAST (data);
2995
2996     result = gst_pad_push_list (queue->srcpad, buffer_list);
2997     g_atomic_int_set (&queue->downstream_may_block, 0);
2998
2999     /* need to check for srcresult here as well */
3000     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3001     if (result == GST_FLOW_EOS) {
3002       data = gst_queue2_dequeue_on_eos (queue, &item_type);
3003       if (data != NULL)
3004         goto next;
3005       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
3006        * to the caller so that the task function does not shut down */
3007       result = GST_FLOW_OK;
3008     }
3009   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
3010     GstQuery *query = GST_QUERY_CAST (data);
3011
3012     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
3013     queue->last_handled_query = query;
3014     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
3015     GST_LOG_OBJECT (queue->srcpad, "Peered query");
3016     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3017         "did query %p, return %d", query, queue->last_query);
3018     g_cond_signal (&queue->query_handled);
3019     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3020     result = GST_FLOW_OK;
3021   }
3022   return result;
3023
3024   /* ERRORS */
3025 no_item:
3026   {
3027     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3028         "exit because we have no item in the queue");
3029     return GST_FLOW_ERROR;
3030   }
3031 out_flushing:
3032   {
3033     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are %s",
3034         gst_flow_get_name (queue->srcresult));
3035     return queue->srcresult;
3036   }
3037 }
3038
3039 /* called repeatedly with @pad as the source pad. This function should push out
3040  * data to the peer element. */
3041 static void
3042 gst_queue2_loop (GstPad * pad)
3043 {
3044   GstQueue2 *queue;
3045   GstFlowReturn ret;
3046
3047   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
3048
3049   /* have to lock for thread-safety */
3050   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3051
3052   if (gst_queue2_is_empty (queue)) {
3053     gboolean started;
3054
3055     /* pause the timer while we wait. The fact that we are waiting does not mean
3056      * the byterate on the output pad is lower */
3057     if ((started = queue->out_timer_started))
3058       g_timer_stop (queue->out_timer);
3059
3060     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
3061         "queue is empty, waiting for new data");
3062     do {
3063       /* Wait for data to be available, we could be unlocked because of a flush. */
3064       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
3065     }
3066     while (gst_queue2_is_empty (queue));
3067
3068     /* and continue if we were running before */
3069     if (started)
3070       g_timer_continue (queue->out_timer);
3071   }
3072   ret = gst_queue2_push_one (queue);
3073   queue->srcresult = ret;
3074   queue->sinkresult = ret;
3075   if (ret != GST_FLOW_OK)
3076     goto out_flushing;
3077
3078   GST_QUEUE2_MUTEX_UNLOCK (queue);
3079   gst_queue2_post_buffering (queue);
3080
3081   return;
3082
3083   /* ERRORS */
3084 out_flushing:
3085   {
3086     gboolean eos = queue->is_eos;
3087     GstFlowReturn ret = queue->srcresult;
3088
3089     gst_pad_pause_task (queue->srcpad);
3090     if (ret == GST_FLOW_FLUSHING) {
3091       gst_queue2_locked_flush (queue, FALSE, FALSE);
3092     } else {
3093       GST_QUEUE2_SIGNAL_DEL (queue);
3094       queue->last_query = FALSE;
3095       g_cond_signal (&queue->query_handled);
3096     }
3097     GST_QUEUE2_MUTEX_UNLOCK (queue);
3098     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3099         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
3100     /* Recalculate buffering levels before stopping since the source flow
3101      * might cause a different buffering level (like NOT_LINKED making
3102      * the queue appear as full) */
3103     if (queue->use_buffering)
3104       update_buffering (queue);
3105     gst_queue2_post_buffering (queue);
3106     /* let app know about us giving up if upstream is not expected to do so */
3107     /* EOS is already taken care of elsewhere */
3108     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3109       GST_ELEMENT_FLOW_ERROR (queue, ret);
3110       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3111     }
3112     return;
3113   }
3114 }
3115
3116 static gboolean
3117 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3118 {
3119   gboolean res = TRUE;
3120   GstQueue2 *queue = GST_QUEUE2 (parent);
3121
3122 #ifndef GST_DISABLE_GST_DEBUG
3123   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3124       event, GST_EVENT_TYPE_NAME (event));
3125 #endif
3126
3127   switch (GST_EVENT_TYPE (event)) {
3128     case GST_EVENT_FLUSH_START:
3129       if (QUEUE_IS_USING_QUEUE (queue)) {
3130         /* just forward upstream */
3131         res = gst_pad_push_event (queue->sinkpad, event);
3132       } else {
3133         /* now unblock the getrange function */
3134         GST_QUEUE2_MUTEX_LOCK (queue);
3135         GST_DEBUG_OBJECT (queue, "flushing");
3136         queue->srcresult = GST_FLOW_FLUSHING;
3137         GST_QUEUE2_SIGNAL_ADD (queue);
3138         GST_QUEUE2_MUTEX_UNLOCK (queue);
3139
3140         /* when using a temp file, we eat the event */
3141         res = TRUE;
3142         gst_event_unref (event);
3143       }
3144       break;
3145     case GST_EVENT_FLUSH_STOP:
3146       if (QUEUE_IS_USING_QUEUE (queue)) {
3147         /* just forward upstream */
3148         res = gst_pad_push_event (queue->sinkpad, event);
3149       } else {
3150         /* now unblock the getrange function */
3151         GST_QUEUE2_MUTEX_LOCK (queue);
3152         queue->srcresult = GST_FLOW_OK;
3153         GST_QUEUE2_MUTEX_UNLOCK (queue);
3154
3155         /* when using a temp file, we eat the event */
3156         res = TRUE;
3157         gst_event_unref (event);
3158       }
3159       break;
3160     case GST_EVENT_RECONFIGURE:
3161       GST_QUEUE2_MUTEX_LOCK (queue);
3162       /* assume downstream is linked now and try to push again */
3163       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3164         queue->srcresult = GST_FLOW_OK;
3165         queue->sinkresult = GST_FLOW_OK;
3166         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3167           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3168               NULL);
3169         }
3170       }
3171       GST_QUEUE2_MUTEX_UNLOCK (queue);
3172
3173       res = gst_pad_push_event (queue->sinkpad, event);
3174       break;
3175     default:
3176       res = gst_pad_push_event (queue->sinkpad, event);
3177       break;
3178   }
3179
3180   return res;
3181 }
3182
3183 static gboolean
3184 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3185 {
3186   GstQueue2 *queue;
3187
3188   queue = GST_QUEUE2 (parent);
3189
3190   switch (GST_QUERY_TYPE (query)) {
3191     case GST_QUERY_POSITION:
3192     {
3193       gint64 peer_pos;
3194       GstFormat format;
3195
3196       if (!gst_pad_peer_query (queue->sinkpad, query))
3197         goto peer_failed;
3198
3199       /* get peer position */
3200       gst_query_parse_position (query, &format, &peer_pos);
3201
3202       /* FIXME: this code assumes that there's no discont in the queue */
3203       switch (format) {
3204         case GST_FORMAT_BYTES:
3205           peer_pos -= queue->cur_level.bytes;
3206           if (peer_pos < 0)     /* Clamp result to 0 */
3207             peer_pos = 0;
3208           break;
3209         case GST_FORMAT_TIME:
3210           peer_pos -= queue->cur_level.time;
3211           if (peer_pos < 0)     /* Clamp result to 0 */
3212             peer_pos = 0;
3213           break;
3214         default:
3215           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3216               "know how to adjust value", gst_format_get_name (format));
3217           return FALSE;
3218       }
3219       /* set updated position */
3220       gst_query_set_position (query, format, peer_pos);
3221       break;
3222     }
3223     case GST_QUERY_DURATION:
3224     {
3225       GST_DEBUG_OBJECT (queue, "doing peer query");
3226
3227       if (!gst_pad_peer_query (queue->sinkpad, query))
3228         goto peer_failed;
3229
3230       GST_DEBUG_OBJECT (queue, "peer query success");
3231       break;
3232     }
3233     case GST_QUERY_BUFFERING:
3234     {
3235       gint percent;
3236       gboolean is_buffering;
3237       GstBufferingMode mode;
3238       gint avg_in, avg_out;
3239       gint64 buffering_left;
3240
3241       GST_DEBUG_OBJECT (queue, "query buffering");
3242
3243       get_buffering_level (queue, &is_buffering, &percent);
3244       percent = convert_to_buffering_percent (queue, percent);
3245       gst_query_set_buffering_percent (query, is_buffering, percent);
3246
3247       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3248           &buffering_left);
3249       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3250           buffering_left);
3251
3252       if (!QUEUE_IS_USING_QUEUE (queue)) {
3253         /* add ranges for download and ringbuffer buffering */
3254         GstFormat format;
3255         gint64 start, stop, range_start, range_stop;
3256         guint64 writing_pos;
3257         gint64 estimated_total;
3258         gint64 duration;
3259         gboolean peer_res, is_eos;
3260         GstQueue2Range *queued_ranges;
3261
3262         /* we need a current download region */
3263         if (queue->current == NULL)
3264           return FALSE;
3265
3266         writing_pos = queue->current->writing_pos;
3267         is_eos = queue->is_eos;
3268
3269         if (is_eos) {
3270           /* we're EOS, we know the duration in bytes now */
3271           peer_res = TRUE;
3272           duration = writing_pos;
3273         } else {
3274           /* get duration of upstream in bytes */
3275           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3276               GST_FORMAT_BYTES, &duration);
3277         }
3278
3279         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3280             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3281
3282         /* calculate remaining and total download time */
3283         if (peer_res && avg_in > 0.0)
3284           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3285         else
3286           estimated_total = -1;
3287
3288         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3289             estimated_total);
3290
3291         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3292
3293         switch (format) {
3294           case GST_FORMAT_PERCENT:
3295             /* we need duration */
3296             if (!peer_res)
3297               goto peer_failed;
3298
3299             start = 0;
3300             /* get our available data relative to the duration */
3301             if (duration != -1)
3302               stop =
3303                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3304                   duration);
3305             else
3306               stop = -1;
3307             break;
3308           case GST_FORMAT_BYTES:
3309             start = 0;
3310             stop = writing_pos;
3311             break;
3312           default:
3313             start = -1;
3314             stop = -1;
3315             break;
3316         }
3317
3318         /* fill out the buffered ranges */
3319         for (queued_ranges = queue->ranges; queued_ranges;
3320             queued_ranges = queued_ranges->next) {
3321           switch (format) {
3322             case GST_FORMAT_PERCENT:
3323               if (duration == -1) {
3324                 range_start = 0;
3325                 range_stop = 0;
3326                 break;
3327               }
3328               range_start =
3329                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3330                   queued_ranges->offset, duration);
3331               range_stop =
3332                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3333                   queued_ranges->writing_pos, duration);
3334               break;
3335             case GST_FORMAT_BYTES:
3336               range_start = queued_ranges->offset;
3337               range_stop = queued_ranges->writing_pos;
3338               break;
3339             default:
3340               range_start = -1;
3341               range_stop = -1;
3342               break;
3343           }
3344           if (range_start == range_stop)
3345             continue;
3346           GST_DEBUG_OBJECT (queue,
3347               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3348               G_GINT64_FORMAT, range_start, range_stop);
3349           gst_query_add_buffering_range (query, range_start, range_stop);
3350         }
3351
3352         gst_query_set_buffering_range (query, format, start, stop,
3353             estimated_total);
3354       }
3355       break;
3356     }
3357     case GST_QUERY_SCHEDULING:
3358     {
3359       gboolean pull_mode;
3360       GstSchedulingFlags flags = 0;
3361
3362       if (!gst_pad_peer_query (queue->sinkpad, query))
3363         goto peer_failed;
3364
3365       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3366
3367       /* we can operate in pull mode when we are using a tempfile */
3368       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3369
3370       if (pull_mode)
3371         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3372       gst_query_set_scheduling (query, flags, 0, -1, 0);
3373       if (pull_mode)
3374         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3375       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3376       break;
3377     }
3378     default:
3379       /* peer handled other queries */
3380       if (!gst_pad_query_default (pad, parent, query))
3381         goto peer_failed;
3382       break;
3383   }
3384
3385   return TRUE;
3386
3387   /* ERRORS */
3388 peer_failed:
3389   {
3390     GST_DEBUG_OBJECT (queue, "failed peer query");
3391     return FALSE;
3392   }
3393 }
3394
3395 static gboolean
3396 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3397 {
3398   GstQueue2 *queue = GST_QUEUE2 (element);
3399
3400   /* simply forward to the srcpad query function */
3401   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3402       query);
3403 }
3404
3405 static void
3406 gst_queue2_update_upstream_size (GstQueue2 * queue)
3407 {
3408   gint64 upstream_size = -1;
3409
3410   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3411           &upstream_size)) {
3412     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3413
3414     /* upstream_size can be negative but queue->upstream_size is unsigned.
3415      * Prevent setting negative values to it (the query can return -1) */
3416     if (upstream_size >= 0)
3417       queue->upstream_size = upstream_size;
3418     else
3419       queue->upstream_size = 0;
3420   }
3421 }
3422
3423 static GstFlowReturn
3424 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3425     guint length, GstBuffer ** buffer)
3426 {
3427   GstQueue2 *queue;
3428   GstFlowReturn ret;
3429
3430   queue = GST_QUEUE2_CAST (parent);
3431
3432   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3433   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3434   offset = (offset == -1) ? queue->current->reading_pos : offset;
3435
3436   GST_DEBUG_OBJECT (queue,
3437       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3438
3439   /* catch any reads beyond the size of the file here to make sure queue2
3440    * doesn't send seek events beyond the size of the file upstream, since
3441    * that would confuse elements such as souphttpsrc and/or http servers.
3442    * Demuxers often just loop until EOS at the end of the file to figure out
3443    * when they've read all the end-headers or index chunks. */
3444   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3445     gst_queue2_update_upstream_size (queue);
3446     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3447       goto out_unexpected;
3448   }
3449
3450   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3451     gst_queue2_update_upstream_size (queue);
3452     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3453       length = queue->upstream_size - offset;
3454       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3455     }
3456   }
3457
3458   /* FIXME - function will block when the range is not yet available */
3459   ret = gst_queue2_create_read (queue, offset, length, buffer);
3460   GST_QUEUE2_MUTEX_UNLOCK (queue);
3461   gst_queue2_post_buffering (queue);
3462
3463   return ret;
3464
3465   /* ERRORS */
3466 out_flushing:
3467   {
3468     ret = queue->srcresult;
3469
3470     GST_DEBUG_OBJECT (queue, "we are %s", gst_flow_get_name (ret));
3471     GST_QUEUE2_MUTEX_UNLOCK (queue);
3472     return ret;
3473   }
3474 out_unexpected:
3475   {
3476     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3477     GST_QUEUE2_MUTEX_UNLOCK (queue);
3478     return GST_FLOW_EOS;
3479   }
3480 }
3481
3482 /* sink currently only operates in push mode */
3483 static gboolean
3484 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3485     GstPadMode mode, gboolean active)
3486 {
3487   gboolean result;
3488   GstQueue2 *queue;
3489
3490   queue = GST_QUEUE2 (parent);
3491
3492   switch (mode) {
3493     case GST_PAD_MODE_PUSH:
3494       if (active) {
3495         GST_QUEUE2_MUTEX_LOCK (queue);
3496         GST_DEBUG_OBJECT (queue, "activating push mode");
3497         queue->srcresult = GST_FLOW_OK;
3498         queue->sinkresult = GST_FLOW_OK;
3499         queue->is_eos = FALSE;
3500         queue->unexpected = FALSE;
3501         reset_rate_timer (queue);
3502         GST_QUEUE2_MUTEX_UNLOCK (queue);
3503       } else {
3504         /* unblock chain function */
3505         GST_QUEUE2_MUTEX_LOCK (queue);
3506         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3507         queue->srcresult = GST_FLOW_FLUSHING;
3508         queue->sinkresult = GST_FLOW_FLUSHING;
3509         GST_QUEUE2_SIGNAL_DEL (queue);
3510         GST_QUEUE2_MUTEX_UNLOCK (queue);
3511
3512         /* wait until it is unblocked and clean up */
3513         GST_PAD_STREAM_LOCK (pad);
3514         GST_QUEUE2_MUTEX_LOCK (queue);
3515         gst_queue2_locked_flush (queue, TRUE, FALSE);
3516         GST_QUEUE2_MUTEX_UNLOCK (queue);
3517         GST_PAD_STREAM_UNLOCK (pad);
3518       }
3519       result = TRUE;
3520       break;
3521     default:
3522       result = FALSE;
3523       break;
3524   }
3525   return result;
3526 }
3527
3528 /* src operating in push mode, we start a task on the source pad that pushes out
3529  * buffers from the queue */
3530 static gboolean
3531 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3532 {
3533   gboolean result = FALSE;
3534   GstQueue2 *queue;
3535
3536   queue = GST_QUEUE2 (parent);
3537
3538   if (active) {
3539     GST_QUEUE2_MUTEX_LOCK (queue);
3540     GST_DEBUG_OBJECT (queue, "activating push mode");
3541     queue->srcresult = GST_FLOW_OK;
3542     queue->sinkresult = GST_FLOW_OK;
3543     queue->is_eos = FALSE;
3544     queue->unexpected = FALSE;
3545     result =
3546         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3547     GST_QUEUE2_MUTEX_UNLOCK (queue);
3548   } else {
3549     /* unblock loop function */
3550     GST_QUEUE2_MUTEX_LOCK (queue);
3551     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3552     queue->srcresult = GST_FLOW_FLUSHING;
3553     queue->sinkresult = GST_FLOW_FLUSHING;
3554     /* the item add signal will unblock */
3555     GST_QUEUE2_SIGNAL_ADD (queue);
3556     GST_QUEUE2_MUTEX_UNLOCK (queue);
3557
3558     /* step 2, make sure streaming finishes */
3559     result = gst_pad_stop_task (pad);
3560
3561     GST_QUEUE2_MUTEX_LOCK (queue);
3562     gst_queue2_locked_flush (queue, FALSE, FALSE);
3563     GST_QUEUE2_MUTEX_UNLOCK (queue);
3564   }
3565
3566   return result;
3567 }
3568
3569 /* pull mode, downstream will call our getrange function */
3570 static gboolean
3571 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3572 {
3573   gboolean result;
3574   GstQueue2 *queue;
3575
3576   queue = GST_QUEUE2 (parent);
3577
3578   if (active) {
3579     GST_QUEUE2_MUTEX_LOCK (queue);
3580     if (!QUEUE_IS_USING_QUEUE (queue)) {
3581       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3582         /* open the temp file now */
3583         result = gst_queue2_open_temp_location_file (queue);
3584       } else if (!queue->ring_buffer) {
3585         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3586         result = ! !queue->ring_buffer;
3587       } else {
3588         result = TRUE;
3589       }
3590
3591       GST_DEBUG_OBJECT (queue, "activating pull mode");
3592       init_ranges (queue);
3593       queue->srcresult = GST_FLOW_OK;
3594       queue->sinkresult = GST_FLOW_OK;
3595       queue->is_eos = FALSE;
3596       queue->unexpected = FALSE;
3597       queue->upstream_size = 0;
3598     } else {
3599       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3600       /* this is not allowed, we cannot operate in pull mode without a temp
3601        * file. */
3602       queue->srcresult = GST_FLOW_FLUSHING;
3603       queue->sinkresult = GST_FLOW_FLUSHING;
3604       result = FALSE;
3605     }
3606     GST_QUEUE2_MUTEX_UNLOCK (queue);
3607   } else {
3608     GST_QUEUE2_MUTEX_LOCK (queue);
3609     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3610     queue->srcresult = GST_FLOW_FLUSHING;
3611     queue->sinkresult = GST_FLOW_FLUSHING;
3612     /* this will unlock getrange */
3613     GST_QUEUE2_SIGNAL_ADD (queue);
3614     result = TRUE;
3615     GST_QUEUE2_MUTEX_UNLOCK (queue);
3616   }
3617
3618   return result;
3619 }
3620
3621 static gboolean
3622 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3623     gboolean active)
3624 {
3625   gboolean res;
3626
3627   switch (mode) {
3628     case GST_PAD_MODE_PULL:
3629       res = gst_queue2_src_activate_pull (pad, parent, active);
3630       break;
3631     case GST_PAD_MODE_PUSH:
3632       res = gst_queue2_src_activate_push (pad, parent, active);
3633       break;
3634     default:
3635       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3636       res = FALSE;
3637       break;
3638   }
3639   return res;
3640 }
3641
3642 static GstStateChangeReturn
3643 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3644 {
3645   GstQueue2 *queue;
3646   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3647
3648   queue = GST_QUEUE2 (element);
3649
3650   switch (transition) {
3651     case GST_STATE_CHANGE_NULL_TO_READY:
3652       break;
3653     case GST_STATE_CHANGE_READY_TO_PAUSED:
3654       GST_QUEUE2_MUTEX_LOCK (queue);
3655       if (!QUEUE_IS_USING_QUEUE (queue)) {
3656         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3657           if (!gst_queue2_open_temp_location_file (queue))
3658             ret = GST_STATE_CHANGE_FAILURE;
3659         } else {
3660           if (queue->ring_buffer) {
3661             g_free (queue->ring_buffer);
3662             queue->ring_buffer = NULL;
3663           }
3664           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3665             ret = GST_STATE_CHANGE_FAILURE;
3666         }
3667         init_ranges (queue);
3668       }
3669       queue->segment_event_received = FALSE;
3670       queue->starting_segment = NULL;
3671       gst_event_replace (&queue->stream_start_event, NULL);
3672       GST_QUEUE2_MUTEX_UNLOCK (queue);
3673       break;
3674     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3675       break;
3676     default:
3677       break;
3678   }
3679
3680   if (ret == GST_STATE_CHANGE_FAILURE)
3681     return ret;
3682
3683   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3684
3685   if (ret == GST_STATE_CHANGE_FAILURE)
3686     return ret;
3687
3688   switch (transition) {
3689     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3690       break;
3691     case GST_STATE_CHANGE_PAUSED_TO_READY:
3692       GST_QUEUE2_MUTEX_LOCK (queue);
3693       if (!QUEUE_IS_USING_QUEUE (queue)) {
3694         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3695           gst_queue2_close_temp_location_file (queue);
3696         } else if (queue->ring_buffer) {
3697           g_free (queue->ring_buffer);
3698           queue->ring_buffer = NULL;
3699         }
3700         clean_ranges (queue);
3701       }
3702       if (queue->starting_segment != NULL) {
3703         gst_event_unref (queue->starting_segment);
3704         queue->starting_segment = NULL;
3705       }
3706       gst_event_replace (&queue->stream_start_event, NULL);
3707       GST_QUEUE2_MUTEX_UNLOCK (queue);
3708       break;
3709     case GST_STATE_CHANGE_READY_TO_NULL:
3710       break;
3711     default:
3712       break;
3713   }
3714
3715   return ret;
3716 }
3717
3718 /* changing the capacity of the queue must wake up
3719  * the _chain function, it might have more room now
3720  * to store the buffer/event in the queue */
3721 #define QUEUE_CAPACITY_CHANGE(q) \
3722   GST_QUEUE2_SIGNAL_DEL (queue); \
3723   if (queue->use_buffering)      \
3724     update_buffering (queue);
3725
3726 /* Changing the minimum required fill level must
3727  * wake up the _loop function as it might now
3728  * be able to preceed.
3729  */
3730 #define QUEUE_THRESHOLD_CHANGE(q)\
3731   GST_QUEUE2_SIGNAL_ADD (queue);
3732
3733 static void
3734 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3735 {
3736   GstState state;
3737
3738   /* the element must be stopped in order to do this */
3739   GST_OBJECT_LOCK (queue);
3740   state = GST_STATE (queue);
3741   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3742     goto wrong_state;
3743   GST_OBJECT_UNLOCK (queue);
3744
3745   /* set new location */
3746   g_free (queue->temp_template);
3747   queue->temp_template = g_strdup (template);
3748
3749   return;
3750
3751 /* ERROR */
3752 wrong_state:
3753   {
3754     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3755     GST_OBJECT_UNLOCK (queue);
3756   }
3757 }
3758
3759 static void
3760 gst_queue2_set_property (GObject * object,
3761     guint prop_id, const GValue * value, GParamSpec * pspec)
3762 {
3763   GstQueue2 *queue = GST_QUEUE2 (object);
3764
3765   /* someone could change levels here, and since this
3766    * affects the get/put funcs, we need to lock for safety. */
3767   GST_QUEUE2_MUTEX_LOCK (queue);
3768
3769   switch (prop_id) {
3770     case PROP_MAX_SIZE_BYTES:
3771       queue->max_level.bytes = g_value_get_uint (value);
3772       QUEUE_CAPACITY_CHANGE (queue);
3773       break;
3774     case PROP_MAX_SIZE_BUFFERS:
3775       queue->max_level.buffers = g_value_get_uint (value);
3776       QUEUE_CAPACITY_CHANGE (queue);
3777       break;
3778     case PROP_MAX_SIZE_TIME:
3779       queue->max_level.time = g_value_get_uint64 (value);
3780       /* set rate_time to the same value. We use an extra field in the level
3781        * structure so that we can easily access and compare it */
3782       queue->max_level.rate_time = queue->max_level.time;
3783       QUEUE_CAPACITY_CHANGE (queue);
3784       break;
3785     case PROP_USE_BUFFERING:
3786       queue->use_buffering = g_value_get_boolean (value);
3787       if (!queue->use_buffering && queue->is_buffering) {
3788         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3789             "posting 100%% message");
3790         SET_PERCENT (queue, 100);
3791         queue->is_buffering = FALSE;
3792       }
3793
3794       if (queue->use_buffering) {
3795         queue->is_buffering = TRUE;
3796         update_buffering (queue);
3797       }
3798       break;
3799     case PROP_USE_TAGS_BITRATE:
3800       queue->use_tags_bitrate = g_value_get_boolean (value);
3801       break;
3802     case PROP_USE_RATE_ESTIMATE:
3803       queue->use_rate_estimate = g_value_get_boolean (value);
3804       break;
3805     case PROP_LOW_PERCENT:
3806       queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3807       if (queue->is_buffering)
3808         update_buffering (queue);
3809       break;
3810     case PROP_HIGH_PERCENT:
3811       queue->high_watermark =
3812           g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3813       if (queue->is_buffering)
3814         update_buffering (queue);
3815       break;
3816     case PROP_LOW_WATERMARK:
3817       queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3818       if (queue->is_buffering)
3819         update_buffering (queue);
3820       break;
3821     case PROP_HIGH_WATERMARK:
3822       queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3823       if (queue->is_buffering)
3824         update_buffering (queue);
3825       break;
3826     case PROP_TEMP_TEMPLATE:
3827       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3828       break;
3829     case PROP_TEMP_REMOVE:
3830       queue->temp_remove = g_value_get_boolean (value);
3831       break;
3832     case PROP_RING_BUFFER_MAX_SIZE:
3833       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3834       break;
3835     default:
3836       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3837       break;
3838   }
3839
3840   GST_QUEUE2_MUTEX_UNLOCK (queue);
3841   gst_queue2_post_buffering (queue);
3842 }
3843
3844 static void
3845 gst_queue2_get_property (GObject * object,
3846     guint prop_id, GValue * value, GParamSpec * pspec)
3847 {
3848   GstQueue2 *queue = GST_QUEUE2 (object);
3849
3850   GST_QUEUE2_MUTEX_LOCK (queue);
3851
3852   switch (prop_id) {
3853     case PROP_CUR_LEVEL_BYTES:
3854       g_value_set_uint (value, queue->cur_level.bytes);
3855       break;
3856     case PROP_CUR_LEVEL_BUFFERS:
3857       g_value_set_uint (value, queue->cur_level.buffers);
3858       break;
3859     case PROP_CUR_LEVEL_TIME:
3860       g_value_set_uint64 (value, queue->cur_level.time);
3861       break;
3862     case PROP_MAX_SIZE_BYTES:
3863       g_value_set_uint (value, queue->max_level.bytes);
3864       break;
3865     case PROP_MAX_SIZE_BUFFERS:
3866       g_value_set_uint (value, queue->max_level.buffers);
3867       break;
3868     case PROP_MAX_SIZE_TIME:
3869       g_value_set_uint64 (value, queue->max_level.time);
3870       break;
3871     case PROP_USE_BUFFERING:
3872       g_value_set_boolean (value, queue->use_buffering);
3873       break;
3874     case PROP_USE_TAGS_BITRATE:
3875       g_value_set_boolean (value, queue->use_tags_bitrate);
3876       break;
3877     case PROP_USE_RATE_ESTIMATE:
3878       g_value_set_boolean (value, queue->use_rate_estimate);
3879       break;
3880     case PROP_LOW_PERCENT:
3881       g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
3882       break;
3883     case PROP_HIGH_PERCENT:
3884       g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
3885       break;
3886     case PROP_LOW_WATERMARK:
3887       g_value_set_double (value, queue->low_watermark /
3888           (gdouble) MAX_BUFFERING_LEVEL);
3889       break;
3890     case PROP_HIGH_WATERMARK:
3891       g_value_set_double (value, queue->high_watermark /
3892           (gdouble) MAX_BUFFERING_LEVEL);
3893       break;
3894     case PROP_TEMP_TEMPLATE:
3895       g_value_set_string (value, queue->temp_template);
3896       break;
3897     case PROP_TEMP_LOCATION:
3898       g_value_set_string (value, queue->temp_location);
3899       break;
3900     case PROP_TEMP_REMOVE:
3901       g_value_set_boolean (value, queue->temp_remove);
3902       break;
3903     case PROP_RING_BUFFER_MAX_SIZE:
3904       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3905       break;
3906     case PROP_AVG_IN_RATE:
3907     {
3908       gdouble in_rate = queue->byte_in_rate;
3909
3910       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3911        * calculated, so calculate it here. */
3912       if (in_rate == 0.0 && queue->bytes_in
3913           && queue->last_update_in_rates_elapsed > 0.0)
3914         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3915
3916       g_value_set_int64 (value, (gint64) in_rate);
3917       break;
3918     }
3919     default:
3920       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3921       break;
3922   }
3923
3924   GST_QUEUE2_MUTEX_UNLOCK (queue);
3925 }