queue2: Update buffering if its enabled and low/high watermarks are changed
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 #ifdef __BIONIC__               /* Android */
77 #undef lseek
78 #define lseek lseek64
79 #undef off_t
80 #define off_t guint64
81 #include <fcntl.h>
82 #endif
83
84 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
85     GST_PAD_SINK,
86     GST_PAD_ALWAYS,
87     GST_STATIC_CAPS_ANY);
88
89 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
90     GST_PAD_SRC,
91     GST_PAD_ALWAYS,
92     GST_STATIC_CAPS_ANY);
93
94 GST_DEBUG_CATEGORY_STATIC (queue_debug);
95 #define GST_CAT_DEFAULT (queue_debug)
96 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
97
98 enum
99 {
100   LAST_SIGNAL
101 };
102
103 /* other defines */
104 #define DEFAULT_BUFFER_SIZE 4096
105 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
106 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
107 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
108
109 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
110
111 /* default property values */
112 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
113 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
114 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
115 #define DEFAULT_USE_BUFFERING      FALSE
116 #define DEFAULT_USE_TAGS_BITRATE   FALSE
117 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
118 #define DEFAULT_LOW_PERCENT        10
119 #define DEFAULT_HIGH_PERCENT       99
120 #define DEFAULT_LOW_WATERMARK      0.01
121 #define DEFAULT_HIGH_WATERMARK     0.99
122 #define DEFAULT_TEMP_REMOVE        TRUE
123 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
124
125 enum
126 {
127   PROP_0,
128   PROP_CUR_LEVEL_BUFFERS,
129   PROP_CUR_LEVEL_BYTES,
130   PROP_CUR_LEVEL_TIME,
131   PROP_MAX_SIZE_BUFFERS,
132   PROP_MAX_SIZE_BYTES,
133   PROP_MAX_SIZE_TIME,
134   PROP_USE_BUFFERING,
135   PROP_USE_TAGS_BITRATE,
136   PROP_USE_RATE_ESTIMATE,
137   PROP_LOW_PERCENT,
138   PROP_HIGH_PERCENT,
139   PROP_LOW_WATERMARK,
140   PROP_HIGH_WATERMARK,
141   PROP_TEMP_TEMPLATE,
142   PROP_TEMP_LOCATION,
143   PROP_TEMP_REMOVE,
144   PROP_RING_BUFFER_MAX_SIZE,
145   PROP_AVG_IN_RATE,
146   PROP_LAST
147 };
148
149 /* Explanation for buffer levels and percentages:
150  *
151  * The buffering_level functions here return a value in a normalized range
152  * that specifies the queue's current fill level. The range goes from 0 to
153  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
154  *
155  * This is not to be confused with the buffering_percent value, which is
156  * a *relative* quantity - relative to the low/high watermarks.
157  * buffering_percent = 0% means buffering_level is at the low watermark.
158  * buffering_percent = 100% means buffering_level is at the high watermark.
159  * buffering_percent is used for determining if the fill level has reached
160  * the high watermark, and for producing BUFFERING messages. This value
161  * always uses a 0..100 range (since it is a percentage).
162  *
163  * To avoid future confusions, whenever "buffering level" is mentioned, it
164  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
165  * range. Whenever "buffering_percent" is mentioned, it refers to the
166  * percentage value that is relative to the low/high watermark. */
167
168 /* Using a buffering level range of 0..1000000 to allow for a
169  * resolution in ppm (1 ppm = 0.0001%) */
170 #define MAX_BUFFERING_LEVEL 1000000
171
172 /* How much 1% makes up in the buffer level range */
173 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
174
175 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
176   l.buffers = 0;                                        \
177   l.bytes = 0;                                          \
178   l.time = 0;                                           \
179   l.rate_time = 0;                                      \
180 } G_STMT_END
181
182 #define STATUS(queue, pad, msg) \
183   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
184                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
185                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
186                       " ns, %"G_GUINT64_FORMAT" items", \
187                       GST_DEBUG_PAD_NAME (pad), \
188                       queue->cur_level.buffers, \
189                       queue->max_level.buffers, \
190                       queue->cur_level.bytes, \
191                       queue->max_level.bytes, \
192                       queue->cur_level.time, \
193                       queue->max_level.time, \
194                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
195                         queue->current->writing_pos - queue->current->max_reading_pos : \
196                         queue->queue.length))
197
198 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
199   g_mutex_lock (&q->qlock);                                              \
200 } G_STMT_END
201
202 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
203   GST_QUEUE2_MUTEX_LOCK (q);                                            \
204   if (res != GST_FLOW_OK)                                               \
205     goto label;                                                         \
206 } G_STMT_END
207
208 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
209   g_mutex_unlock (&q->qlock);                                            \
210 } G_STMT_END
211
212 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
213   STATUS (queue, q->sinkpad, "wait for DEL");                           \
214   q->waiting_del = TRUE;                                                \
215   g_cond_wait (&q->item_del, &queue->qlock);                              \
216   q->waiting_del = FALSE;                                               \
217   if (res != GST_FLOW_OK) {                                             \
218     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
219     goto label;                                                         \
220   }                                                                     \
221   STATUS (queue, q->sinkpad, "received DEL");                           \
222 } G_STMT_END
223
224 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
225   STATUS (queue, q->srcpad, "wait for ADD");                            \
226   q->waiting_add = TRUE;                                                \
227   g_cond_wait (&q->item_add, &q->qlock);                                  \
228   q->waiting_add = FALSE;                                               \
229   if (res != GST_FLOW_OK) {                                             \
230     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
231     goto label;                                                         \
232   }                                                                     \
233   STATUS (queue, q->srcpad, "received ADD");                            \
234 } G_STMT_END
235
236 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
237   if (q->waiting_del) {                                                 \
238     STATUS (q, q->srcpad, "signal DEL");                                \
239     g_cond_signal (&q->item_del);                                        \
240   }                                                                     \
241 } G_STMT_END
242
243 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
244   if (q->waiting_add) {                                                 \
245     STATUS (q, q->sinkpad, "signal ADD");                               \
246     g_cond_signal (&q->item_add);                                        \
247   }                                                                     \
248 } G_STMT_END
249
250 #define SET_PERCENT(q, perc) G_STMT_START {                              \
251   if (perc != q->buffering_percent) {                                    \
252     q->buffering_percent = perc;                                         \
253     q->percent_changed = TRUE;                                           \
254     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
255     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
256         &q->buffering_left);                                             \
257   }                                                                      \
258 } G_STMT_END
259
260 #define _do_init \
261     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
262     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
263         "dataflow inside the queue element");
264 #define gst_queue2_parent_class parent_class
265 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
266
267 static void gst_queue2_finalize (GObject * object);
268
269 static void gst_queue2_set_property (GObject * object,
270     guint prop_id, const GValue * value, GParamSpec * pspec);
271 static void gst_queue2_get_property (GObject * object,
272     guint prop_id, GValue * value, GParamSpec * pspec);
273
274 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
275     GstBuffer * buffer);
276 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
277     GstBufferList * buffer_list);
278 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
279 static void gst_queue2_loop (GstPad * pad);
280
281 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
282     GstEvent * event);
283 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
284     GstQuery * query);
285
286 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
287     GstEvent * event);
288 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
289     GstQuery * query);
290 static gboolean gst_queue2_handle_query (GstElement * element,
291     GstQuery * query);
292
293 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
294     guint64 offset, guint length, GstBuffer ** buffer);
295
296 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
297     GstPadMode mode, gboolean active);
298 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
299     GstPadMode mode, gboolean active);
300 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
301     GstStateChange transition);
302
303 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
304 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
305
306 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
307 static void update_in_rates (GstQueue2 * queue, gboolean force);
308 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
309 static void gst_queue2_post_buffering (GstQueue2 * queue);
310
311 typedef enum
312 {
313   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
314   GST_QUEUE2_ITEM_TYPE_BUFFER,
315   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
316   GST_QUEUE2_ITEM_TYPE_EVENT,
317   GST_QUEUE2_ITEM_TYPE_QUERY
318 } GstQueue2ItemType;
319
320 typedef struct
321 {
322   GstQueue2ItemType type;
323   GstMiniObject *item;
324 } GstQueue2Item;
325
326 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
327
328 static void
329 gst_queue2_class_init (GstQueue2Class * klass)
330 {
331   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
332   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
333
334   gobject_class->set_property = gst_queue2_set_property;
335   gobject_class->get_property = gst_queue2_get_property;
336
337   /* properties */
338   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
339       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
340           "Current amount of data in the queue (bytes)",
341           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
343       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
344           "Current number of buffers in the queue",
345           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
346   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
347       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
348           "Current amount of data in the queue (in ns)",
349           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
350
351   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
352       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
353           "Max. amount of data in the queue (bytes, 0=disable)",
354           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
355           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
358       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
359           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
360           DEFAULT_MAX_SIZE_BUFFERS,
361           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
362           G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
364       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
365           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
366           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
367           G_PARAM_STATIC_STRINGS));
368
369   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
370       g_param_spec_boolean ("use-buffering", "Use buffering",
371           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
372           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
373           G_PARAM_STATIC_STRINGS));
374   g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
375       g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
376           "Use a bitrate from upstream tags to estimate buffer duration if not provided",
377           DEFAULT_USE_TAGS_BITRATE,
378           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
379           G_PARAM_STATIC_STRINGS));
380   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
381       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
382           "Estimate the bitrate of the stream to calculate time level",
383           DEFAULT_USE_RATE_ESTIMATE,
384           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
385   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
386       g_param_spec_int ("low-percent", "Low percent",
387           "Low threshold for buffering to start. Only used if use-buffering is True "
388           "(Deprecated: use low-watermark instead)",
389           0, 100, DEFAULT_LOW_WATERMARK * 100,
390           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
391   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
392       g_param_spec_int ("high-percent", "High percent",
393           "High threshold for buffering to finish. Only used if use-buffering is True "
394           "(Deprecated: use high-watermark instead)",
395           0, 100, DEFAULT_HIGH_WATERMARK * 100,
396           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
397   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
398       g_param_spec_double ("low-watermark", "Low watermark",
399           "Low threshold for buffering to start. Only used if use-buffering is True",
400           0.0, 1.0, DEFAULT_LOW_WATERMARK,
401           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
402   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
403       g_param_spec_double ("high-watermark", "High watermark",
404           "High threshold for buffering to finish. Only used if use-buffering is True",
405           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
406           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407
408   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
409       g_param_spec_string ("temp-template", "Temporary File Template",
410           "File template to store temporary files in, should contain directory "
411           "and XXXXXX. (NULL == disabled)",
412           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413
414   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
415       g_param_spec_string ("temp-location", "Temporary File Location",
416           "Location to store temporary files in (Only read this property, "
417           "use temp-template to configure the name template)",
418           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
419
420   /**
421    * GstQueue2:temp-remove
422    *
423    * When temp-template is set, remove the temporary file when going to READY.
424    */
425   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
426       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
427           "Remove the temp-location after use",
428           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
429
430   /**
431    * GstQueue2:ring-buffer-max-size
432    *
433    * The maximum size of the ring buffer in bytes. If set to 0, the ring
434    * buffer is disabled. Default 0.
435    */
436   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
437       g_param_spec_uint64 ("ring-buffer-max-size",
438           "Max. ring buffer size (bytes)",
439           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
440           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
441           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442
443   /**
444    * GstQueue2:avg-in-rate
445    *
446    * The average input data rate.
447    */
448   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
449       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
450           "Average input data rate (bytes/s)",
451           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
452
453   /* set several parent class virtual functions */
454   gobject_class->finalize = gst_queue2_finalize;
455
456   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
457   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
458
459   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
460       "Generic",
461       "Simple data queue",
462       "Erik Walthinsen <omega@cse.ogi.edu>, "
463       "Wim Taymans <wim.taymans@gmail.com>");
464
465   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
466   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
467 }
468
469 static void
470 gst_queue2_init (GstQueue2 * queue)
471 {
472   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
473
474   gst_pad_set_chain_function (queue->sinkpad,
475       GST_DEBUG_FUNCPTR (gst_queue2_chain));
476   gst_pad_set_chain_list_function (queue->sinkpad,
477       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
478   gst_pad_set_activatemode_function (queue->sinkpad,
479       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
480   gst_pad_set_event_function (queue->sinkpad,
481       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
482   gst_pad_set_query_function (queue->sinkpad,
483       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
484   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
485   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
486
487   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
488
489   gst_pad_set_activatemode_function (queue->srcpad,
490       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
491   gst_pad_set_getrange_function (queue->srcpad,
492       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
493   gst_pad_set_event_function (queue->srcpad,
494       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
495   gst_pad_set_query_function (queue->srcpad,
496       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
497   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
498   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
499
500   /* levels */
501   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
502   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
503   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
504   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
505   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
506   queue->use_buffering = DEFAULT_USE_BUFFERING;
507   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
508   queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
509   queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
510
511   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
512   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
513
514   queue->sinktime = GST_CLOCK_TIME_NONE;
515   queue->srctime = GST_CLOCK_TIME_NONE;
516   queue->sink_tainted = TRUE;
517   queue->src_tainted = TRUE;
518
519   queue->srcresult = GST_FLOW_FLUSHING;
520   queue->sinkresult = GST_FLOW_FLUSHING;
521   queue->is_eos = FALSE;
522   queue->in_timer = g_timer_new ();
523   queue->out_timer = g_timer_new ();
524
525   g_mutex_init (&queue->qlock);
526   queue->waiting_add = FALSE;
527   g_cond_init (&queue->item_add);
528   queue->waiting_del = FALSE;
529   g_cond_init (&queue->item_del);
530   g_queue_init (&queue->queue);
531
532   g_cond_init (&queue->query_handled);
533   queue->last_query = FALSE;
534
535   g_mutex_init (&queue->buffering_post_lock);
536   queue->buffering_percent = 100;
537
538   /* tempfile related */
539   queue->temp_template = NULL;
540   queue->temp_location = NULL;
541   queue->temp_remove = DEFAULT_TEMP_REMOVE;
542
543   queue->ring_buffer = NULL;
544   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
545
546   GST_DEBUG_OBJECT (queue,
547       "initialized queue's not_empty & not_full conditions");
548 }
549
550 /* called only once, as opposed to dispose */
551 static void
552 gst_queue2_finalize (GObject * object)
553 {
554   GstQueue2 *queue = GST_QUEUE2 (object);
555
556   GST_DEBUG_OBJECT (queue, "finalizing queue");
557
558   while (!g_queue_is_empty (&queue->queue)) {
559     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
560
561     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
562       gst_mini_object_unref (qitem->item);
563     g_slice_free (GstQueue2Item, qitem);
564   }
565
566   queue->last_query = FALSE;
567   g_queue_clear (&queue->queue);
568   g_mutex_clear (&queue->qlock);
569   g_mutex_clear (&queue->buffering_post_lock);
570   g_cond_clear (&queue->item_add);
571   g_cond_clear (&queue->item_del);
572   g_cond_clear (&queue->query_handled);
573   g_timer_destroy (queue->in_timer);
574   g_timer_destroy (queue->out_timer);
575
576   /* temp_file path cleanup  */
577   g_free (queue->temp_template);
578   g_free (queue->temp_location);
579
580   G_OBJECT_CLASS (parent_class)->finalize (object);
581 }
582
583 static void
584 debug_ranges (GstQueue2 * queue)
585 {
586   GstQueue2Range *walk;
587
588   for (walk = queue->ranges; walk; walk = walk->next) {
589     GST_DEBUG_OBJECT (queue,
590         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
591         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
592         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
593         walk->rb_writing_pos, walk->reading_pos,
594         walk == queue->current ? "**y**" : "  n  ");
595   }
596 }
597
598 /* clear all the downloaded ranges */
599 static void
600 clean_ranges (GstQueue2 * queue)
601 {
602   GST_DEBUG_OBJECT (queue, "clean queue ranges");
603
604   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
605   queue->ranges = NULL;
606   queue->current = NULL;
607 }
608
609 /* find a range that contains @offset or NULL when nothing does */
610 static GstQueue2Range *
611 find_range (GstQueue2 * queue, guint64 offset)
612 {
613   GstQueue2Range *range = NULL;
614   GstQueue2Range *walk;
615
616   /* first do a quick check for the current range */
617   for (walk = queue->ranges; walk; walk = walk->next) {
618     if (offset >= walk->offset && offset <= walk->writing_pos) {
619       /* we can reuse an existing range */
620       range = walk;
621       break;
622     }
623   }
624   if (range) {
625     GST_DEBUG_OBJECT (queue,
626         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
627         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
628   } else {
629     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
630   }
631   return range;
632 }
633
634 static void
635 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
636 {
637   guint64 max_reading_pos, writing_pos;
638
639   writing_pos = range->writing_pos;
640   max_reading_pos = range->max_reading_pos;
641
642   if (writing_pos > max_reading_pos)
643     queue->cur_level.bytes = writing_pos - max_reading_pos;
644   else
645     queue->cur_level.bytes = 0;
646 }
647
648 /* make a new range for @offset or reuse an existing range */
649 static GstQueue2Range *
650 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
651 {
652   GstQueue2Range *range, *prev, *next;
653
654   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
655
656   if ((range = find_range (queue, offset))) {
657     GST_DEBUG_OBJECT (queue,
658         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
659         range->writing_pos);
660     if (update_existing && range->writing_pos != offset) {
661       GST_DEBUG_OBJECT (queue, "updating range writing position to "
662           "%" G_GUINT64_FORMAT, offset);
663       range->writing_pos = offset;
664     }
665   } else {
666     GST_DEBUG_OBJECT (queue,
667         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
668
669     range = g_slice_new0 (GstQueue2Range);
670     range->offset = offset;
671     /* we want to write to the next location in the ring buffer */
672     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
673     range->writing_pos = offset;
674     range->rb_writing_pos = range->rb_offset;
675     range->reading_pos = offset;
676     range->max_reading_pos = offset;
677
678     /* insert sorted */
679     prev = NULL;
680     next = queue->ranges;
681     while (next) {
682       if (next->offset > offset) {
683         /* insert before next */
684         GST_DEBUG_OBJECT (queue,
685             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
686             next->offset);
687         break;
688       }
689       /* try next */
690       prev = next;
691       next = next->next;
692     }
693     range->next = next;
694     if (prev)
695       prev->next = range;
696     else
697       queue->ranges = range;
698   }
699   debug_ranges (queue);
700
701   /* update the stats for this range */
702   update_cur_level (queue, range);
703
704   return range;
705 }
706
707
708 /* clear and init the download ranges for offset 0 */
709 static void
710 init_ranges (GstQueue2 * queue)
711 {
712   GST_DEBUG_OBJECT (queue, "init queue ranges");
713
714   /* get rid of all the current ranges */
715   clean_ranges (queue);
716   /* make a range for offset 0 */
717   queue->current = add_range (queue, 0, TRUE);
718 }
719
720 /* calculate the diff between running time on the sink and src of the queue.
721  * This is the total amount of time in the queue. */
722 static void
723 update_time_level (GstQueue2 * queue)
724 {
725   if (queue->sink_tainted) {
726     queue->sinktime =
727         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
728         queue->sink_segment.position);
729     queue->sink_tainted = FALSE;
730   }
731
732   if (queue->src_tainted) {
733     queue->srctime =
734         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
735         queue->src_segment.position);
736     queue->src_tainted = FALSE;
737   }
738
739   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
740       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
741
742   if (queue->sinktime != GST_CLOCK_TIME_NONE
743       && queue->srctime != GST_CLOCK_TIME_NONE
744       && queue->sinktime >= queue->srctime)
745     queue->cur_level.time = queue->sinktime - queue->srctime;
746   else
747     queue->cur_level.time = 0;
748 }
749
750 /* take a SEGMENT event and apply the values to segment, updating the time
751  * level of queue. */
752 static void
753 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
754     gboolean is_sink)
755 {
756   gst_event_copy_segment (event, segment);
757
758   if (segment->format == GST_FORMAT_BYTES) {
759     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
760       /* start is where we'll be getting from and as such writing next */
761       queue->current = add_range (queue, segment->start, TRUE);
762     }
763   }
764
765   /* now configure the values, we use these to track timestamps on the
766    * sinkpad. */
767   if (segment->format != GST_FORMAT_TIME) {
768     /* non-time format, pretend the current time segment is closed with a
769      * 0 start and unknown stop time. */
770     segment->format = GST_FORMAT_TIME;
771     segment->start = 0;
772     segment->stop = -1;
773     segment->time = 0;
774   }
775
776   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
777
778   if (is_sink)
779     queue->sink_tainted = TRUE;
780   else
781     queue->src_tainted = TRUE;
782
783   /* segment can update the time level of the queue */
784   update_time_level (queue);
785 }
786
787 static void
788 apply_gap (GstQueue2 * queue, GstEvent * event,
789     GstSegment * segment, gboolean is_sink)
790 {
791   GstClockTime timestamp;
792   GstClockTime duration;
793
794   gst_event_parse_gap (event, &timestamp, &duration);
795
796   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
797
798     if (GST_CLOCK_TIME_IS_VALID (duration)) {
799       timestamp += duration;
800     }
801
802     segment->position = timestamp;
803
804     if (is_sink)
805       queue->sink_tainted = TRUE;
806     else
807       queue->src_tainted = TRUE;
808
809     /* calc diff with other end */
810     update_time_level (queue);
811   }
812 }
813
814 /* take a buffer and update segment, updating the time level of the queue. */
815 static void
816 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
817     guint64 size, gboolean is_sink)
818 {
819   GstClockTime duration, timestamp;
820
821   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
822   duration = GST_BUFFER_DURATION (buffer);
823
824   /* If we have no duration, pick one from the bitrate if we can */
825   if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
826     guint bitrate =
827         is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
828     if (bitrate)
829       duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
830   }
831
832   /* if no timestamp is set, assume it's continuous with the previous
833    * time */
834   if (timestamp == GST_CLOCK_TIME_NONE)
835     timestamp = segment->position;
836
837   /* add duration */
838   if (duration != GST_CLOCK_TIME_NONE)
839     timestamp += duration;
840
841   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
842       GST_TIME_ARGS (timestamp));
843
844   segment->position = timestamp;
845
846   if (is_sink)
847     queue->sink_tainted = TRUE;
848   else
849     queue->src_tainted = TRUE;
850
851   /* calc diff with other end */
852   update_time_level (queue);
853 }
854
855 struct BufListData
856 {
857   GstClockTime timestamp;
858   guint bitrate;
859 };
860
861 static gboolean
862 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
863 {
864   struct BufListData *bld = data;
865   GstClockTime *timestamp = &bld->timestamp;
866   GstClockTime btime;
867
868   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
869       " duration %" GST_TIME_FORMAT, idx,
870       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
871       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
872       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
873
874   btime = GST_BUFFER_DTS_OR_PTS (*buf);
875   if (GST_CLOCK_TIME_IS_VALID (btime))
876     *timestamp = btime;
877
878   if (GST_BUFFER_DURATION_IS_VALID (*buf))
879     *timestamp += GST_BUFFER_DURATION (*buf);
880   else if (bld->bitrate != 0) {
881     guint64 size = gst_buffer_get_size (*buf);
882
883     /* If we have no duration, pick one from the bitrate if we can */
884     *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
885   }
886
887
888   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
889   return TRUE;
890 }
891
892 /* take a buffer list and update segment, updating the time level of the queue */
893 static void
894 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
895     GstSegment * segment, gboolean is_sink)
896 {
897   struct BufListData bld;
898
899   /* if no timestamp is set, assume it's continuous with the previous time */
900   bld.timestamp = segment->position;
901
902   if (queue->use_tags_bitrate) {
903     if (is_sink)
904       bld.bitrate = queue->sink_tags_bitrate;
905     else
906       bld.bitrate = queue->src_tags_bitrate;
907   } else
908     bld.bitrate = 0;
909
910   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
911
912   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
913       GST_TIME_ARGS (bld.timestamp));
914
915   segment->position = bld.timestamp;
916
917   if (is_sink)
918     queue->sink_tainted = TRUE;
919   else
920     queue->src_tainted = TRUE;
921
922   /* calc diff with other end */
923   update_time_level (queue);
924 }
925
926 static inline gint
927 normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
928     guint64 alt_max)
929 {
930   guint64 p;
931
932   if (max_level == 0)
933     return 0;
934
935   if (alt_max > 0)
936     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
937         MIN (max_level, alt_max));
938   else
939     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
940
941   return MIN (p, MAX_BUFFERING_LEVEL);
942 }
943
944 static gboolean
945 get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
946     gint * buffering_level)
947 {
948   gint buflevel, buflevel2;
949
950   if (queue->high_watermark <= 0) {
951     if (buffering_level)
952       *buffering_level = MAX_BUFFERING_LEVEL;
953     if (is_buffering)
954       *is_buffering = FALSE;
955     return FALSE;
956   }
957 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
958     normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
959
960   if (queue->is_eos) {
961     /* on EOS we are always 100% full, we set the var here so that it we can
962      * reuse the logic below to stop buffering */
963     buflevel = MAX_BUFFERING_LEVEL;
964     GST_LOG_OBJECT (queue, "we are EOS");
965   } else {
966     GST_LOG_OBJECT (queue,
967         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
968         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
969         queue->cur_level.buffers);
970
971     /* figure out the buffering level we are filled, we take the max of all formats. */
972     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
973       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
974     } else {
975       guint64 rb_size = queue->ring_buffer_max_size;
976       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
977     }
978
979     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
980     buflevel = MAX (buflevel, buflevel2);
981
982     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
983     buflevel = MAX (buflevel, buflevel2);
984
985     /* also apply the rate estimate when we need to */
986     if (queue->use_rate_estimate) {
987       buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
988       buflevel = MAX (buflevel, buflevel2);
989     }
990
991     /* Don't get to 0% unless we're really empty */
992     if (queue->cur_level.bytes > 0)
993       buflevel = MAX (1, buflevel);
994   }
995 #undef GET_BUFFER_LEVEL_FOR_QUANTITY
996
997   if (is_buffering)
998     *is_buffering = queue->is_buffering;
999
1000   if (buffering_level)
1001     *buffering_level = buflevel;
1002
1003   GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
1004       buflevel);
1005
1006   return TRUE;
1007 }
1008
1009 static gint
1010 convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
1011 {
1012   int percent;
1013
1014   /* scale so that if buffering_level equals the high watermark,
1015    * the percentage is 100% */
1016   percent = buffering_level * 100 / queue->high_watermark;
1017   /* clip */
1018   if (percent > 100)
1019     percent = 100;
1020
1021   return percent;
1022 }
1023
1024 static void
1025 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
1026     gint * avg_in, gint * avg_out, gint64 * buffering_left)
1027 {
1028   if (mode) {
1029     if (!QUEUE_IS_USING_QUEUE (queue)) {
1030       if (QUEUE_IS_USING_RING_BUFFER (queue))
1031         *mode = GST_BUFFERING_TIMESHIFT;
1032       else
1033         *mode = GST_BUFFERING_DOWNLOAD;
1034     } else {
1035       *mode = GST_BUFFERING_STREAM;
1036     }
1037   }
1038
1039   if (avg_in)
1040     *avg_in = queue->byte_in_rate;
1041   if (avg_out)
1042     *avg_out = queue->byte_out_rate;
1043
1044   if (buffering_left) {
1045     *buffering_left = (percent == 100 ? 0 : -1);
1046
1047     if (queue->use_rate_estimate) {
1048       guint64 max, cur;
1049
1050       max = queue->max_level.rate_time;
1051       cur = queue->cur_level.rate_time;
1052
1053       if (percent != 100 && max > cur)
1054         *buffering_left = (max - cur) / 1000000;
1055     }
1056   }
1057 }
1058
1059 /* Called with the lock taken */
1060 static GstMessage *
1061 gst_queue2_get_buffering_message (GstQueue2 * queue)
1062 {
1063   GstMessage *msg = NULL;
1064
1065   if (queue->percent_changed) {
1066     gint percent = queue->buffering_percent;
1067
1068     queue->percent_changed = FALSE;
1069
1070     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1071     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1072
1073     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1074         queue->avg_out, queue->buffering_left);
1075   }
1076
1077   return msg;
1078 }
1079
1080 static void
1081 gst_queue2_post_buffering (GstQueue2 * queue)
1082 {
1083   GstMessage *msg = NULL;
1084
1085   g_mutex_lock (&queue->buffering_post_lock);
1086   GST_QUEUE2_MUTEX_LOCK (queue);
1087   msg = gst_queue2_get_buffering_message (queue);
1088   GST_QUEUE2_MUTEX_UNLOCK (queue);
1089
1090   if (msg != NULL)
1091     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1092
1093   g_mutex_unlock (&queue->buffering_post_lock);
1094 }
1095
1096 static void
1097 update_buffering (GstQueue2 * queue)
1098 {
1099   gint buffering_level, percent;
1100
1101   /* Ensure the variables used to calculate buffering state are up-to-date. */
1102   if (queue->current)
1103     update_cur_level (queue, queue->current);
1104   update_in_rates (queue, FALSE);
1105
1106   if (!get_buffering_level (queue, NULL, &buffering_level))
1107     return;
1108
1109   percent = convert_to_buffering_percent (queue, buffering_level);
1110
1111   if (queue->is_buffering) {
1112     /* if we were buffering see if we reached the high watermark */
1113     if (percent >= 100)
1114       queue->is_buffering = FALSE;
1115
1116     SET_PERCENT (queue, percent);
1117   } else {
1118     /* we were not buffering, check if we need to start buffering if we drop
1119      * below the low threshold */
1120     if (buffering_level < queue->low_watermark) {
1121       queue->is_buffering = TRUE;
1122       SET_PERCENT (queue, percent);
1123     }
1124   }
1125 }
1126
1127 static void
1128 reset_rate_timer (GstQueue2 * queue)
1129 {
1130   queue->bytes_in = 0;
1131   queue->bytes_out = 0;
1132   queue->byte_in_rate = 0.0;
1133   queue->byte_in_period = 0;
1134   queue->byte_out_rate = 0.0;
1135   queue->last_update_in_rates_elapsed = 0.0;
1136   queue->last_in_elapsed = 0.0;
1137   queue->last_out_elapsed = 0.0;
1138   queue->in_timer_started = FALSE;
1139   queue->out_timer_started = FALSE;
1140 }
1141
1142 /* the interval in seconds to recalculate the rate */
1143 #define RATE_INTERVAL    0.2
1144 /* Tuning for rate estimation. We use a large window for the input rate because
1145  * it should be stable when connected to a network. The output rate is less
1146  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1147  * therefore adapt more quickly.
1148  * However, initial input rate may be subject to a burst, and should therefore
1149  * initially also adapt more quickly to changes, and only later on give higher
1150  * weight to previous values. */
1151 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1152 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1153
1154 static void
1155 update_in_rates (GstQueue2 * queue, gboolean force)
1156 {
1157   gdouble elapsed, period;
1158   gdouble byte_in_rate;
1159
1160   if (!queue->in_timer_started) {
1161     queue->in_timer_started = TRUE;
1162     g_timer_start (queue->in_timer);
1163     return;
1164   }
1165
1166   queue->last_update_in_rates_elapsed = elapsed =
1167       g_timer_elapsed (queue->in_timer, NULL);
1168
1169   /* recalc after each interval. */
1170   if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1171     period = elapsed - queue->last_in_elapsed;
1172
1173     GST_DEBUG_OBJECT (queue,
1174         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1175         period, queue->bytes_in, queue->byte_in_period);
1176
1177     byte_in_rate = queue->bytes_in / period;
1178
1179     if (queue->byte_in_rate == 0.0)
1180       queue->byte_in_rate = byte_in_rate;
1181     else
1182       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1183           (double) queue->byte_in_period, period);
1184
1185     /* another data point, cap at 16 for long time running average */
1186     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1187       queue->byte_in_period += period;
1188
1189     /* reset the values to calculate rate over the next interval */
1190     queue->last_in_elapsed = elapsed;
1191     queue->bytes_in = 0;
1192   }
1193
1194   if (queue->byte_in_rate > 0.0) {
1195     queue->cur_level.rate_time =
1196         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1197   }
1198   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1199       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1200 }
1201
1202 static void
1203 update_out_rates (GstQueue2 * queue)
1204 {
1205   gdouble elapsed, period;
1206   gdouble byte_out_rate;
1207
1208   if (!queue->out_timer_started) {
1209     queue->out_timer_started = TRUE;
1210     g_timer_start (queue->out_timer);
1211     return;
1212   }
1213
1214   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1215
1216   /* recalc after each interval. */
1217   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1218     period = elapsed - queue->last_out_elapsed;
1219
1220     GST_DEBUG_OBJECT (queue,
1221         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1222
1223     byte_out_rate = queue->bytes_out / period;
1224
1225     if (queue->byte_out_rate == 0.0)
1226       queue->byte_out_rate = byte_out_rate;
1227     else
1228       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1229
1230     /* reset the values to calculate rate over the next interval */
1231     queue->last_out_elapsed = elapsed;
1232     queue->bytes_out = 0;
1233   }
1234   if (queue->byte_in_rate > 0.0) {
1235     queue->cur_level.rate_time =
1236         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1237   }
1238   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1239       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1240 }
1241
1242 static void
1243 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1244 {
1245   guint64 reading_pos, max_reading_pos;
1246
1247   reading_pos = pos;
1248   max_reading_pos = range->max_reading_pos;
1249
1250   max_reading_pos = MAX (max_reading_pos, reading_pos);
1251
1252   GST_DEBUG_OBJECT (queue,
1253       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1254       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1255   range->max_reading_pos = max_reading_pos;
1256
1257   update_cur_level (queue, range);
1258 }
1259
1260 static gboolean
1261 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1262 {
1263   GstEvent *event;
1264   gboolean res;
1265
1266   /* until we receive the FLUSH_STOP from this seek, we skip data */
1267   queue->seeking = TRUE;
1268   GST_QUEUE2_MUTEX_UNLOCK (queue);
1269
1270   debug_ranges (queue);
1271
1272   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1273
1274   event =
1275       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1276       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1277       GST_SEEK_TYPE_NONE, -1);
1278
1279   res = gst_pad_push_event (queue->sinkpad, event);
1280   GST_QUEUE2_MUTEX_LOCK (queue);
1281
1282   if (res) {
1283     /* Between us sending the seek event and re-acquiring the lock, the source
1284      * thread might already have pushed data and moved along the range's
1285      * writing_pos beyond the seek offset. In that case we don't want to set
1286      * the writing position back to the requested seek position, as it would
1287      * cause data to be written to the wrong offset in the file or ring buffer.
1288      * We still do the add_range call to switch the current range to the
1289      * requested range, or create one if one doesn't exist yet. */
1290     queue->current = add_range (queue, offset, FALSE);
1291   }
1292
1293   return res;
1294 }
1295
1296 /* get the threshold for when we decide to seek rather than wait */
1297 static guint64
1298 get_seek_threshold (GstQueue2 * queue)
1299 {
1300   guint64 threshold;
1301
1302   /* FIXME, find a good threshold based on the incoming rate. */
1303   threshold = 1024 * 512;
1304
1305   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1306     threshold = MIN (threshold,
1307         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1308   }
1309   return threshold;
1310 }
1311
1312 /* see if there is enough data in the file to read a full buffer */
1313 static gboolean
1314 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1315 {
1316   GstQueue2Range *range;
1317
1318   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1319       offset, length);
1320
1321   if ((range = find_range (queue, offset))) {
1322     if (queue->current != range) {
1323       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1324       perform_seek_to_offset (queue, range->writing_pos);
1325     }
1326
1327     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1328         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1329
1330     /* we have a range for offset */
1331     GST_DEBUG_OBJECT (queue,
1332         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1333         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1334
1335     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1336       return TRUE;
1337
1338     if (offset + length <= range->writing_pos)
1339       return TRUE;
1340     else
1341       GST_DEBUG_OBJECT (queue,
1342           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1343           (offset + length) - range->writing_pos);
1344
1345   } else {
1346     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1347         " len %u", offset, length);
1348     /* we don't have the range, see how far away we are */
1349     if (!queue->is_eos && queue->current) {
1350       guint64 threshold = get_seek_threshold (queue);
1351
1352       if (offset >= queue->current->offset && offset <=
1353           queue->current->writing_pos + threshold) {
1354         GST_INFO_OBJECT (queue,
1355             "requested data is within range, wait for data");
1356         return FALSE;
1357       }
1358     }
1359
1360     /* too far away, do a seek */
1361     perform_seek_to_offset (queue, offset);
1362   }
1363
1364   return FALSE;
1365 }
1366
1367 #ifdef HAVE_FSEEKO
1368 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1369 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1370 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1371 #else
1372 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1373 #endif
1374
1375 static GstFlowReturn
1376 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1377     guint8 * dst, gint64 * read_return)
1378 {
1379   guint8 *ring_buffer;
1380   size_t res;
1381
1382   ring_buffer = queue->ring_buffer;
1383
1384   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1385     goto seek_failed;
1386
1387   /* this should not block */
1388   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1389       length, offset);
1390   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1391     res = fread (dst, 1, length, queue->temp_file);
1392   } else {
1393     memcpy (dst, ring_buffer + offset, length);
1394     res = length;
1395   }
1396
1397   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1398
1399   if (G_UNLIKELY (res < length)) {
1400     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1401       goto could_not_read;
1402     /* check for errors or EOF */
1403     if (ferror (queue->temp_file))
1404       goto could_not_read;
1405     if (feof (queue->temp_file) && length > 0)
1406       goto eos;
1407   }
1408
1409   *read_return = res;
1410
1411   return GST_FLOW_OK;
1412
1413 seek_failed:
1414   {
1415     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1416     return GST_FLOW_ERROR;
1417   }
1418 could_not_read:
1419   {
1420     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1421     return GST_FLOW_ERROR;
1422   }
1423 eos:
1424   {
1425     GST_DEBUG ("non-regular file hits EOS");
1426     return GST_FLOW_EOS;
1427   }
1428 }
1429
1430 static GstFlowReturn
1431 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1432     GstBuffer ** buffer)
1433 {
1434   GstBuffer *buf;
1435   GstMapInfo info;
1436   guint8 *data;
1437   guint64 file_offset;
1438   guint block_length, remaining, read_length;
1439   guint64 rb_size;
1440   guint64 max_size;
1441   guint64 rpos;
1442   GstFlowReturn ret = GST_FLOW_OK;
1443
1444   /* allocate the output buffer of the requested size */
1445   if (*buffer == NULL)
1446     buf = gst_buffer_new_allocate (NULL, length, NULL);
1447   else
1448     buf = *buffer;
1449
1450   if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1451     goto buffer_write_fail;
1452   data = info.data;
1453
1454   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1455       offset);
1456
1457   rpos = offset;
1458   rb_size = queue->ring_buffer_max_size;
1459   max_size = QUEUE_MAX_BYTES (queue);
1460
1461   remaining = length;
1462   while (remaining > 0) {
1463     /* configure how much/whether to read */
1464     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1465       read_length = 0;
1466
1467       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1468         guint64 level;
1469
1470         /* calculate how far away the offset is */
1471         if (queue->current->writing_pos > rpos)
1472           level = queue->current->writing_pos - rpos;
1473         else
1474           level = 0;
1475
1476         GST_DEBUG_OBJECT (queue,
1477             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1478             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1479             rpos, queue->current->writing_pos, level, max_size);
1480
1481         if (level >= max_size) {
1482           /* we don't have the data but if we have a ring buffer that is full, we
1483            * need to read */
1484           GST_DEBUG_OBJECT (queue,
1485               "ring buffer full, reading QUEUE_MAX_BYTES %"
1486               G_GUINT64_FORMAT " bytes", max_size);
1487           read_length = max_size;
1488         } else if (queue->is_eos) {
1489           /* won't get any more data so read any data we have */
1490           if (level) {
1491             GST_DEBUG_OBJECT (queue,
1492                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1493                 level);
1494             read_length = level;
1495             remaining = level;
1496             length = level;
1497           } else
1498             goto hit_eos;
1499         }
1500       }
1501
1502       if (read_length == 0) {
1503         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1504           GST_DEBUG_OBJECT (queue,
1505               "update current position [%" G_GUINT64_FORMAT "-%"
1506               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1507           update_cur_pos (queue, queue->current, rpos);
1508           GST_QUEUE2_SIGNAL_DEL (queue);
1509         }
1510
1511         if (queue->use_buffering)
1512           update_buffering (queue);
1513
1514         GST_DEBUG_OBJECT (queue, "waiting for add");
1515         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1516         continue;
1517       }
1518     } else {
1519       /* we have the requested data so read it */
1520       read_length = remaining;
1521     }
1522
1523     /* set range reading_pos to actual reading position for this read */
1524     queue->current->reading_pos = rpos;
1525
1526     /* configure how much and from where to read */
1527     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1528       file_offset =
1529           (queue->current->rb_offset + (rpos -
1530               queue->current->offset)) % rb_size;
1531       if (file_offset + read_length > rb_size) {
1532         block_length = rb_size - file_offset;
1533       } else {
1534         block_length = read_length;
1535       }
1536     } else {
1537       file_offset = rpos;
1538       block_length = read_length;
1539     }
1540
1541     /* while we still have data to read, we loop */
1542     while (read_length > 0) {
1543       gint64 read_return;
1544
1545       ret =
1546           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1547           data, &read_return);
1548       if (ret != GST_FLOW_OK)
1549         goto read_error;
1550
1551       file_offset += read_return;
1552       if (QUEUE_IS_USING_RING_BUFFER (queue))
1553         file_offset %= rb_size;
1554
1555       data += read_return;
1556       read_length -= read_return;
1557       block_length = read_length;
1558       remaining -= read_return;
1559
1560       rpos = (queue->current->reading_pos += read_return);
1561       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1562     }
1563     GST_QUEUE2_SIGNAL_DEL (queue);
1564     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1565   }
1566
1567   gst_buffer_unmap (buf, &info);
1568   gst_buffer_resize (buf, 0, length);
1569
1570   GST_BUFFER_OFFSET (buf) = offset;
1571   GST_BUFFER_OFFSET_END (buf) = offset + length;
1572
1573   *buffer = buf;
1574
1575   return ret;
1576
1577   /* ERRORS */
1578 hit_eos:
1579   {
1580     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1581     gst_buffer_unmap (buf, &info);
1582     if (*buffer == NULL)
1583       gst_buffer_unref (buf);
1584     return GST_FLOW_EOS;
1585   }
1586 out_flushing:
1587   {
1588     GST_DEBUG_OBJECT (queue, "we are flushing");
1589     gst_buffer_unmap (buf, &info);
1590     if (*buffer == NULL)
1591       gst_buffer_unref (buf);
1592     return GST_FLOW_FLUSHING;
1593   }
1594 read_error:
1595   {
1596     GST_DEBUG_OBJECT (queue, "we have a read error");
1597     gst_buffer_unmap (buf, &info);
1598     if (*buffer == NULL)
1599       gst_buffer_unref (buf);
1600     return ret;
1601   }
1602 buffer_write_fail:
1603   {
1604     GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1605         ("Can't write to buffer"));
1606     if (*buffer == NULL)
1607       gst_buffer_unref (buf);
1608     return GST_FLOW_ERROR;
1609   }
1610 }
1611
1612 /* should be called with QUEUE_LOCK */
1613 static GstMiniObject *
1614 gst_queue2_read_item_from_file (GstQueue2 * queue)
1615 {
1616   GstMiniObject *item;
1617
1618   if (queue->stream_start_event != NULL) {
1619     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1620     queue->stream_start_event = NULL;
1621   } else if (queue->starting_segment != NULL) {
1622     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1623     queue->starting_segment = NULL;
1624   } else {
1625     GstFlowReturn ret;
1626     GstBuffer *buffer = NULL;
1627     guint64 reading_pos;
1628
1629     reading_pos = queue->current->reading_pos;
1630
1631     ret =
1632         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1633         &buffer);
1634
1635     switch (ret) {
1636       case GST_FLOW_OK:
1637         item = GST_MINI_OBJECT_CAST (buffer);
1638         break;
1639       case GST_FLOW_EOS:
1640         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1641         break;
1642       default:
1643         item = NULL;
1644         break;
1645     }
1646   }
1647   return item;
1648 }
1649
1650 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1651  * the temp filename. */
1652 static gboolean
1653 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1654 {
1655   gint fd = -1;
1656   gchar *name = NULL;
1657
1658   if (queue->temp_file)
1659     goto already_opened;
1660
1661   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1662
1663   /* If temp_template was set, allocate a filename and open that file */
1664
1665   /* nothing to do */
1666   if (queue->temp_template == NULL)
1667     goto no_directory;
1668
1669   /* make copy of the template, we don't want to change this */
1670   name = g_strdup (queue->temp_template);
1671
1672 #ifdef __BIONIC__
1673   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1674 #else
1675   fd = g_mkstemp (name);
1676 #endif
1677
1678   if (fd == -1)
1679     goto mkstemp_failed;
1680
1681   /* open the file for update/writing */
1682   queue->temp_file = fdopen (fd, "wb+");
1683   /* error creating file */
1684   if (queue->temp_file == NULL)
1685     goto open_failed;
1686
1687   g_free (queue->temp_location);
1688   queue->temp_location = name;
1689
1690   GST_QUEUE2_MUTEX_UNLOCK (queue);
1691
1692   /* we can't emit the notify with the lock */
1693   g_object_notify (G_OBJECT (queue), "temp-location");
1694
1695   GST_QUEUE2_MUTEX_LOCK (queue);
1696
1697   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1698
1699   return TRUE;
1700
1701   /* ERRORS */
1702 already_opened:
1703   {
1704     GST_DEBUG_OBJECT (queue, "temp file was already open");
1705     return TRUE;
1706   }
1707 no_directory:
1708   {
1709     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1710         (_("No Temp directory specified.")), (NULL));
1711     return FALSE;
1712   }
1713 mkstemp_failed:
1714   {
1715     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1716         (_("Could not create temp file \"%s\"."), queue->temp_template),
1717         GST_ERROR_SYSTEM);
1718     g_free (name);
1719     return FALSE;
1720   }
1721 open_failed:
1722   {
1723     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1724         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1725     g_free (name);
1726     if (fd != -1)
1727       close (fd);
1728     return FALSE;
1729   }
1730 }
1731
1732 static void
1733 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1734 {
1735   /* nothing to do */
1736   if (queue->temp_file == NULL)
1737     return;
1738
1739   GST_DEBUG_OBJECT (queue, "closing temp file");
1740
1741   fflush (queue->temp_file);
1742   fclose (queue->temp_file);
1743
1744   if (queue->temp_remove) {
1745     if (remove (queue->temp_location) < 0) {
1746       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1747           queue->temp_location, g_strerror (errno));
1748     }
1749   }
1750
1751   queue->temp_file = NULL;
1752   clean_ranges (queue);
1753 }
1754
1755 static void
1756 gst_queue2_flush_temp_file (GstQueue2 * queue)
1757 {
1758   if (queue->temp_file == NULL)
1759     return;
1760
1761   GST_DEBUG_OBJECT (queue, "flushing temp file");
1762
1763   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1764 }
1765
1766 static void
1767 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1768 {
1769   if (!QUEUE_IS_USING_QUEUE (queue)) {
1770     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1771       gst_queue2_flush_temp_file (queue);
1772     init_ranges (queue);
1773   } else {
1774     while (!g_queue_is_empty (&queue->queue)) {
1775       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1776
1777       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1778           && GST_EVENT_IS_STICKY (qitem->item)
1779           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1780           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1781         gst_pad_store_sticky_event (queue->srcpad,
1782             GST_EVENT_CAST (qitem->item));
1783       }
1784
1785       /* Then lose another reference because we are supposed to destroy that
1786          data when flushing */
1787       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1788         gst_mini_object_unref (qitem->item);
1789       g_slice_free (GstQueue2Item, qitem);
1790     }
1791   }
1792   queue->last_query = FALSE;
1793   g_cond_signal (&queue->query_handled);
1794   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1795   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1796   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1797   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1798   queue->sink_tainted = queue->src_tainted = TRUE;
1799   if (queue->starting_segment != NULL)
1800     gst_event_unref (queue->starting_segment);
1801   queue->starting_segment = NULL;
1802   queue->segment_event_received = FALSE;
1803   gst_event_replace (&queue->stream_start_event, NULL);
1804
1805   /* we deleted a lot of something */
1806   GST_QUEUE2_SIGNAL_DEL (queue);
1807 }
1808
1809 static gboolean
1810 gst_queue2_wait_free_space (GstQueue2 * queue)
1811 {
1812   /* We make space available if we're "full" according to whatever
1813    * the user defined as "full". */
1814   if (gst_queue2_is_filled (queue)) {
1815     gboolean started;
1816
1817     /* pause the timer while we wait. The fact that we are waiting does not mean
1818      * the byterate on the input pad is lower */
1819     if ((started = queue->in_timer_started))
1820       g_timer_stop (queue->in_timer);
1821
1822     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1823         "queue is full, waiting for free space");
1824     do {
1825       /* Wait for space to be available, we could be unlocked because of a flush. */
1826       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1827     }
1828     while (gst_queue2_is_filled (queue));
1829
1830     /* and continue if we were running before */
1831     if (started)
1832       g_timer_continue (queue->in_timer);
1833   }
1834   return TRUE;
1835
1836   /* ERRORS */
1837 out_flushing:
1838   {
1839     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1840     return FALSE;
1841   }
1842 }
1843
1844 static gboolean
1845 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1846 {
1847   GstMapInfo info;
1848   guint8 *data, *ring_buffer;
1849   guint size, rb_size;
1850   guint64 writing_pos, new_writing_pos;
1851   GstQueue2Range *range, *prev, *next;
1852   gboolean do_seek = FALSE;
1853
1854   if (QUEUE_IS_USING_RING_BUFFER (queue))
1855     writing_pos = queue->current->rb_writing_pos;
1856   else
1857     writing_pos = queue->current->writing_pos;
1858   ring_buffer = queue->ring_buffer;
1859   rb_size = queue->ring_buffer_max_size;
1860
1861   if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
1862     goto buffer_read_error;
1863
1864   size = info.size;
1865   data = info.data;
1866
1867   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1868       writing_pos);
1869
1870   /* sanity check */
1871   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1872       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1873     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1874         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1875         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1876   }
1877
1878   while (size > 0) {
1879     guint to_write;
1880
1881     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1882       gint64 space;
1883
1884       /* calculate the space in the ring buffer not used by data from
1885        * the current range */
1886       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1887         /* wait until there is some free space */
1888         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1889       }
1890       /* get the amount of space we have */
1891       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1892
1893       /* calculate if we need to split or if we can write the entire
1894        * buffer now */
1895       to_write = MIN (size, space);
1896
1897       /* the writing position in the ring buffer after writing (part
1898        * or all of) the buffer */
1899       new_writing_pos = (writing_pos + to_write) % rb_size;
1900
1901       prev = NULL;
1902       range = queue->ranges;
1903
1904       /* if we need to overwrite data in the ring buffer, we need to
1905        * update the ranges
1906        *
1907        * warning: this code is complicated and includes some
1908        * simplifications - pen, paper and diagrams for the cases
1909        * recommended! */
1910       while (range) {
1911         guint64 range_data_start, range_data_end;
1912         GstQueue2Range *range_to_destroy = NULL;
1913
1914         if (range == queue->current)
1915           goto next_range;
1916
1917         range_data_start = range->rb_offset;
1918         range_data_end = range->rb_writing_pos;
1919
1920         /* handle the special case where the range has no data in it */
1921         if (range->writing_pos == range->offset) {
1922           if (range != queue->current) {
1923             GST_DEBUG_OBJECT (queue,
1924                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1925                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1926             /* remove range */
1927             range_to_destroy = range;
1928             if (prev)
1929               prev->next = range->next;
1930           }
1931           goto next_range;
1932         }
1933
1934         if (range_data_end > range_data_start) {
1935           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1936             goto next_range;
1937
1938           if (new_writing_pos > range_data_start) {
1939             if (new_writing_pos >= range_data_end) {
1940               GST_DEBUG_OBJECT (queue,
1941                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1942                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1943               /* remove range */
1944               range_to_destroy = range;
1945               if (prev)
1946                 prev->next = range->next;
1947             } else {
1948               GST_DEBUG_OBJECT (queue,
1949                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1950                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1951                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1952                   range->offset + new_writing_pos - range_data_start,
1953                   new_writing_pos);
1954               range->offset += (new_writing_pos - range_data_start);
1955               range->rb_offset = new_writing_pos;
1956             }
1957           }
1958         } else {
1959           guint64 new_wpos_virt = writing_pos + to_write;
1960
1961           if (new_wpos_virt <= range_data_start)
1962             goto next_range;
1963
1964           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1965             GST_DEBUG_OBJECT (queue,
1966                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1967                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1968             /* remove range */
1969             range_to_destroy = range;
1970             if (prev)
1971               prev->next = range->next;
1972           } else {
1973             GST_DEBUG_OBJECT (queue,
1974                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1975                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1976                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1977                 range->offset + new_writing_pos - range_data_start,
1978                 new_writing_pos);
1979             range->offset += (new_wpos_virt - range_data_start);
1980             range->rb_offset = new_writing_pos;
1981           }
1982         }
1983
1984       next_range:
1985         if (!range_to_destroy)
1986           prev = range;
1987
1988         range = range->next;
1989         if (range_to_destroy) {
1990           if (range_to_destroy == queue->ranges)
1991             queue->ranges = range;
1992           g_slice_free (GstQueue2Range, range_to_destroy);
1993           range_to_destroy = NULL;
1994         }
1995       }
1996     } else {
1997       to_write = size;
1998       new_writing_pos = writing_pos + to_write;
1999     }
2000
2001     if (QUEUE_IS_USING_TEMP_FILE (queue)
2002         && FSEEK_FILE (queue->temp_file, writing_pos))
2003       goto seek_failed;
2004
2005     if (new_writing_pos > writing_pos) {
2006       GST_INFO_OBJECT (queue,
2007           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
2008           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
2009           queue->current->writing_pos, queue->current->rb_writing_pos);
2010       /* either not using ring buffer or no wrapping, just write */
2011       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2012         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
2013           goto handle_error;
2014       } else {
2015         memcpy (ring_buffer + writing_pos, data, to_write);
2016       }
2017
2018       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
2019         /* try to merge with next range */
2020         while ((next = queue->current->next)) {
2021           GST_INFO_OBJECT (queue,
2022               "checking merge with next range %" G_GUINT64_FORMAT " < %"
2023               G_GUINT64_FORMAT, new_writing_pos, next->offset);
2024           if (new_writing_pos < next->offset)
2025             break;
2026
2027           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
2028               next->writing_pos);
2029
2030           /* remove the group */
2031           queue->current->next = next->next;
2032
2033           /* We use the threshold to decide if we want to do a seek or simply
2034            * read the data again. If there is not so much data in the range we
2035            * prefer to avoid to seek and read it again. */
2036           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
2037             /* the new range had more data than the threshold, it's worth keeping
2038              * it and doing a seek. */
2039             new_writing_pos = next->writing_pos;
2040             do_seek = TRUE;
2041           }
2042           g_slice_free (GstQueue2Range, next);
2043         }
2044         goto update_and_signal;
2045       }
2046     } else {
2047       /* wrapping */
2048       guint block_one, block_two;
2049
2050       block_one = rb_size - writing_pos;
2051       block_two = to_write - block_one;
2052
2053       if (block_one > 0) {
2054         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2055         /* write data to end of ring buffer */
2056         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2057           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2058             goto handle_error;
2059         } else {
2060           memcpy (ring_buffer + writing_pos, data, block_one);
2061         }
2062       }
2063
2064       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2065         goto seek_failed;
2066
2067       if (block_two > 0) {
2068         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2069         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2070           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2071             goto handle_error;
2072         } else {
2073           memcpy (ring_buffer, data + block_one, block_two);
2074         }
2075       }
2076     }
2077
2078   update_and_signal:
2079     /* update the writing positions */
2080     size -= to_write;
2081     GST_INFO_OBJECT (queue,
2082         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2083         to_write, writing_pos, size);
2084
2085     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2086       data += to_write;
2087       queue->current->writing_pos += to_write;
2088       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2089     } else {
2090       queue->current->writing_pos = writing_pos = new_writing_pos;
2091     }
2092     if (do_seek)
2093       perform_seek_to_offset (queue, new_writing_pos);
2094
2095     update_cur_level (queue, queue->current);
2096
2097     /* update the buffering status */
2098     if (queue->use_buffering) {
2099       GstMessage *msg;
2100       update_buffering (queue);
2101       msg = gst_queue2_get_buffering_message (queue);
2102       if (msg) {
2103         GST_QUEUE2_MUTEX_UNLOCK (queue);
2104         g_mutex_lock (&queue->buffering_post_lock);
2105         gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2106         g_mutex_unlock (&queue->buffering_post_lock);
2107         GST_QUEUE2_MUTEX_LOCK (queue);
2108       }
2109     }
2110
2111     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2112         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2113
2114     GST_QUEUE2_SIGNAL_ADD (queue);
2115   }
2116
2117   gst_buffer_unmap (buffer, &info);
2118
2119   return TRUE;
2120
2121   /* ERRORS */
2122 out_flushing:
2123   {
2124     GST_DEBUG_OBJECT (queue, "we are flushing");
2125     gst_buffer_unmap (buffer, &info);
2126     /* FIXME - GST_FLOW_EOS ? */
2127     return FALSE;
2128   }
2129 seek_failed:
2130   {
2131     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2132     gst_buffer_unmap (buffer, &info);
2133     return FALSE;
2134   }
2135 handle_error:
2136   {
2137     switch (errno) {
2138       case ENOSPC:{
2139         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2140         break;
2141       }
2142       default:{
2143         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2144             (_("Error while writing to download file.")),
2145             ("%s", g_strerror (errno)));
2146       }
2147     }
2148     gst_buffer_unmap (buffer, &info);
2149     return FALSE;
2150   }
2151 buffer_read_error:
2152   {
2153     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2154         ("Can't read from buffer"));
2155     return FALSE;
2156   }
2157 }
2158
2159 static gboolean
2160 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2161 {
2162   GstQueue2 *queue = q;
2163
2164   GST_TRACE_OBJECT (queue,
2165       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2166       gst_buffer_get_size (*buf));
2167
2168   if (!gst_queue2_create_write (queue, *buf)) {
2169     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2170     return FALSE;
2171   }
2172   return TRUE;
2173 }
2174
2175 static gboolean
2176 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2177 {
2178   guint *p_size = data;
2179   gsize buf_size;
2180
2181   buf_size = gst_buffer_get_size (*buf);
2182   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2183   *p_size += buf_size;
2184   return TRUE;
2185 }
2186
2187 /* enqueue an item an update the level stats */
2188 static void
2189 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2190     GstQueue2ItemType item_type)
2191 {
2192   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2193     GstBuffer *buffer;
2194     guint size;
2195
2196     buffer = GST_BUFFER_CAST (item);
2197     size = gst_buffer_get_size (buffer);
2198
2199     /* add buffer to the statistics */
2200     if (QUEUE_IS_USING_QUEUE (queue)) {
2201       queue->cur_level.buffers++;
2202       queue->cur_level.bytes += size;
2203     }
2204     queue->bytes_in += size;
2205
2206     /* apply new buffer to segment stats */
2207     apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2208     /* update the byterate stats */
2209     update_in_rates (queue, FALSE);
2210
2211     if (!QUEUE_IS_USING_QUEUE (queue)) {
2212       /* FIXME - check return value? */
2213       gst_queue2_create_write (queue, buffer);
2214     }
2215   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2216     GstBufferList *buffer_list;
2217     guint size = 0;
2218
2219     buffer_list = GST_BUFFER_LIST_CAST (item);
2220
2221     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2222     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2223
2224     /* add buffer to the statistics */
2225     if (QUEUE_IS_USING_QUEUE (queue)) {
2226       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2227       queue->cur_level.bytes += size;
2228     }
2229     queue->bytes_in += size;
2230
2231     /* apply new buffer to segment stats */
2232     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2233
2234     /* update the byterate stats */
2235     update_in_rates (queue, FALSE);
2236
2237     if (!QUEUE_IS_USING_QUEUE (queue)) {
2238       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2239     }
2240   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2241     GstEvent *event;
2242
2243     event = GST_EVENT_CAST (item);
2244
2245     switch (GST_EVENT_TYPE (event)) {
2246       case GST_EVENT_EOS:
2247         /* Zero the thresholds, this makes sure the queue is completely
2248          * filled and we can read all data from the queue. */
2249         GST_DEBUG_OBJECT (queue, "we have EOS");
2250         queue->is_eos = TRUE;
2251         /* Force updating the input bitrate */
2252         update_in_rates (queue, TRUE);
2253         break;
2254       case GST_EVENT_SEGMENT:
2255         apply_segment (queue, event, &queue->sink_segment, TRUE);
2256         /* This is our first new segment, we hold it
2257          * as we can't save it on the temp file */
2258         if (!QUEUE_IS_USING_QUEUE (queue)) {
2259           if (queue->segment_event_received)
2260             goto unexpected_event;
2261
2262           queue->segment_event_received = TRUE;
2263           if (queue->starting_segment != NULL)
2264             gst_event_unref (queue->starting_segment);
2265           queue->starting_segment = event;
2266           item = NULL;
2267         }
2268         /* a new segment allows us to accept more buffers if we got EOS
2269          * from downstream */
2270         queue->unexpected = FALSE;
2271         break;
2272       case GST_EVENT_GAP:
2273         apply_gap (queue, event, &queue->sink_segment, TRUE);
2274         break;
2275       case GST_EVENT_STREAM_START:
2276         if (!QUEUE_IS_USING_QUEUE (queue)) {
2277           gst_event_replace (&queue->stream_start_event, event);
2278           gst_event_unref (event);
2279           item = NULL;
2280         }
2281         break;
2282       case GST_EVENT_CAPS:{
2283         GstCaps *caps;
2284
2285         gst_event_parse_caps (event, &caps);
2286         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2287
2288         if (!QUEUE_IS_USING_QUEUE (queue)) {
2289           GST_LOG ("Dropping caps event, not using queue");
2290           gst_event_unref (event);
2291           item = NULL;
2292         }
2293         break;
2294       }
2295       default:
2296         if (!QUEUE_IS_USING_QUEUE (queue))
2297           goto unexpected_event;
2298         break;
2299     }
2300   } else if (GST_IS_QUERY (item)) {
2301     /* Can't happen as we check that in the caller */
2302     if (!QUEUE_IS_USING_QUEUE (queue))
2303       g_assert_not_reached ();
2304   } else {
2305     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2306         item, GST_OBJECT_NAME (queue));
2307     /* we can't really unref since we don't know what it is */
2308     item = NULL;
2309   }
2310
2311   if (item) {
2312     /* update the buffering status */
2313     if (queue->use_buffering)
2314       update_buffering (queue);
2315
2316     if (QUEUE_IS_USING_QUEUE (queue)) {
2317       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2318       qitem->type = item_type;
2319       qitem->item = item;
2320       g_queue_push_tail (&queue->queue, qitem);
2321     } else {
2322       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2323     }
2324
2325     GST_QUEUE2_SIGNAL_ADD (queue);
2326   }
2327
2328   return;
2329
2330   /* ERRORS */
2331 unexpected_event:
2332   {
2333     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2334
2335     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2336         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2337         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2338     gst_event_unref (GST_EVENT_CAST (item));
2339     return;
2340   }
2341 }
2342
2343 /* dequeue an item from the queue and update level stats */
2344 static GstMiniObject *
2345 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2346 {
2347   GstMiniObject *item;
2348
2349   if (!QUEUE_IS_USING_QUEUE (queue)) {
2350     item = gst_queue2_read_item_from_file (queue);
2351   } else {
2352     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2353
2354     if (qitem == NULL)
2355       goto no_item;
2356
2357     item = qitem->item;
2358     g_slice_free (GstQueue2Item, qitem);
2359   }
2360
2361   if (item == NULL)
2362     goto no_item;
2363
2364   if (GST_IS_BUFFER (item)) {
2365     GstBuffer *buffer;
2366     guint size;
2367
2368     buffer = GST_BUFFER_CAST (item);
2369     size = gst_buffer_get_size (buffer);
2370     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2371
2372     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2373         "retrieved buffer %p from queue", buffer);
2374
2375     if (QUEUE_IS_USING_QUEUE (queue)) {
2376       queue->cur_level.buffers--;
2377       queue->cur_level.bytes -= size;
2378     }
2379     queue->bytes_out += size;
2380
2381     apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2382     /* update the byterate stats */
2383     update_out_rates (queue);
2384     /* update the buffering */
2385     if (queue->use_buffering)
2386       update_buffering (queue);
2387
2388   } else if (GST_IS_EVENT (item)) {
2389     GstEvent *event = GST_EVENT_CAST (item);
2390
2391     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2392
2393     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2394         "retrieved event %p from queue", event);
2395
2396     switch (GST_EVENT_TYPE (event)) {
2397       case GST_EVENT_EOS:
2398         /* queue is empty now that we dequeued the EOS */
2399         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2400         break;
2401       case GST_EVENT_SEGMENT:
2402         apply_segment (queue, event, &queue->src_segment, FALSE);
2403         break;
2404       case GST_EVENT_GAP:
2405         apply_gap (queue, event, &queue->src_segment, FALSE);
2406         break;
2407       default:
2408         break;
2409     }
2410   } else if (GST_IS_BUFFER_LIST (item)) {
2411     GstBufferList *buffer_list;
2412     guint size = 0;
2413
2414     buffer_list = GST_BUFFER_LIST_CAST (item);
2415     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2416     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2417
2418     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2419         "retrieved buffer list %p from queue", buffer_list);
2420
2421     if (QUEUE_IS_USING_QUEUE (queue)) {
2422       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2423       queue->cur_level.bytes -= size;
2424     }
2425     queue->bytes_out += size;
2426
2427     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2428     /* update the byterate stats */
2429     update_out_rates (queue);
2430     /* update the buffering */
2431     if (queue->use_buffering)
2432       update_buffering (queue);
2433   } else if (GST_IS_QUERY (item)) {
2434     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2435         "retrieved query %p from queue", item);
2436     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2437   } else {
2438     g_warning
2439         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2440         item, GST_OBJECT_NAME (queue));
2441     item = NULL;
2442     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2443   }
2444   GST_QUEUE2_SIGNAL_DEL (queue);
2445
2446   return item;
2447
2448   /* ERRORS */
2449 no_item:
2450   {
2451     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2452     return NULL;
2453   }
2454 }
2455
2456 static gboolean
2457 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2458     GstEvent * event)
2459 {
2460   gboolean ret = TRUE;
2461   GstQueue2 *queue;
2462
2463   queue = GST_QUEUE2 (parent);
2464
2465   switch (GST_EVENT_TYPE (event)) {
2466     case GST_EVENT_FLUSH_START:
2467     {
2468       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2469       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2470         /* forward event */
2471         ret = gst_pad_push_event (queue->srcpad, event);
2472
2473         /* now unblock the chain function */
2474         GST_QUEUE2_MUTEX_LOCK (queue);
2475         queue->srcresult = GST_FLOW_FLUSHING;
2476         queue->sinkresult = GST_FLOW_FLUSHING;
2477         /* unblock the loop and chain functions */
2478         GST_QUEUE2_SIGNAL_ADD (queue);
2479         GST_QUEUE2_SIGNAL_DEL (queue);
2480         GST_QUEUE2_MUTEX_UNLOCK (queue);
2481
2482         /* make sure it pauses, this should happen since we sent
2483          * flush_start downstream. */
2484         gst_pad_pause_task (queue->srcpad);
2485         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2486
2487         GST_QUEUE2_MUTEX_LOCK (queue);
2488         queue->last_query = FALSE;
2489         g_cond_signal (&queue->query_handled);
2490         GST_QUEUE2_MUTEX_UNLOCK (queue);
2491       } else {
2492         GST_QUEUE2_MUTEX_LOCK (queue);
2493         /* flush the sink pad */
2494         queue->sinkresult = GST_FLOW_FLUSHING;
2495         GST_QUEUE2_SIGNAL_DEL (queue);
2496         queue->last_query = FALSE;
2497         g_cond_signal (&queue->query_handled);
2498         GST_QUEUE2_MUTEX_UNLOCK (queue);
2499
2500         gst_event_unref (event);
2501       }
2502       break;
2503     }
2504     case GST_EVENT_FLUSH_STOP:
2505     {
2506       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2507
2508       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2509         /* forward event */
2510         ret = gst_pad_push_event (queue->srcpad, event);
2511
2512         GST_QUEUE2_MUTEX_LOCK (queue);
2513         gst_queue2_locked_flush (queue, FALSE, TRUE);
2514         queue->srcresult = GST_FLOW_OK;
2515         queue->sinkresult = GST_FLOW_OK;
2516         queue->is_eos = FALSE;
2517         queue->unexpected = FALSE;
2518         queue->seeking = FALSE;
2519         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2520         /* reset rate counters */
2521         reset_rate_timer (queue);
2522         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2523             queue->srcpad, NULL);
2524         GST_QUEUE2_MUTEX_UNLOCK (queue);
2525       } else {
2526         GST_QUEUE2_MUTEX_LOCK (queue);
2527         queue->segment_event_received = FALSE;
2528         queue->is_eos = FALSE;
2529         queue->unexpected = FALSE;
2530         queue->sinkresult = GST_FLOW_OK;
2531         queue->seeking = FALSE;
2532         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2533         GST_QUEUE2_MUTEX_UNLOCK (queue);
2534
2535         gst_event_unref (event);
2536       }
2537       break;
2538     }
2539     case GST_EVENT_TAG:{
2540       if (queue->use_tags_bitrate) {
2541         GstTagList *tags;
2542         guint bitrate;
2543
2544         gst_event_parse_tag (event, &tags);
2545         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2546             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2547           GST_QUEUE2_MUTEX_LOCK (queue);
2548           queue->sink_tags_bitrate = bitrate;
2549           GST_QUEUE2_MUTEX_UNLOCK (queue);
2550           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2551         }
2552       }
2553       /* Fall-through */
2554     }
2555     default:
2556       if (GST_EVENT_IS_SERIALIZED (event)) {
2557         /* serialized events go in the queue */
2558         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2559         if (queue->srcresult != GST_FLOW_OK) {
2560           /* Errors in sticky event pushing are no problem and ignored here
2561            * as they will cause more meaningful errors during data flow.
2562            * For EOS events, that are not followed by data flow, we still
2563            * return FALSE here though and report an error.
2564            */
2565           if (!GST_EVENT_IS_STICKY (event)) {
2566             goto out_flow_error;
2567           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2568             if (queue->srcresult == GST_FLOW_NOT_LINKED
2569                 || queue->srcresult < GST_FLOW_EOS) {
2570               GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
2571             }
2572             goto out_flow_error;
2573           }
2574         }
2575         /* refuse more events on EOS */
2576         if (queue->is_eos)
2577           goto out_eos;
2578         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2579         GST_QUEUE2_MUTEX_UNLOCK (queue);
2580         gst_queue2_post_buffering (queue);
2581       } else {
2582         /* non-serialized events are passed upstream. */
2583         ret = gst_pad_push_event (queue->srcpad, event);
2584       }
2585       break;
2586   }
2587   return ret;
2588
2589   /* ERRORS */
2590 out_flushing:
2591   {
2592     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2593     GST_QUEUE2_MUTEX_UNLOCK (queue);
2594     gst_event_unref (event);
2595     return FALSE;
2596   }
2597 out_eos:
2598   {
2599     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2600     GST_QUEUE2_MUTEX_UNLOCK (queue);
2601     gst_event_unref (event);
2602     return FALSE;
2603   }
2604 out_flow_error:
2605   {
2606     GST_LOG_OBJECT (queue,
2607         "refusing event, we have a downstream flow error: %s",
2608         gst_flow_get_name (queue->srcresult));
2609     GST_QUEUE2_MUTEX_UNLOCK (queue);
2610     gst_event_unref (event);
2611     return FALSE;
2612   }
2613 }
2614
2615 static gboolean
2616 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2617     GstQuery * query)
2618 {
2619   GstQueue2 *queue;
2620   gboolean res;
2621
2622   queue = GST_QUEUE2 (parent);
2623
2624   switch (GST_QUERY_TYPE (query)) {
2625     default:
2626       if (GST_QUERY_IS_SERIALIZED (query)) {
2627         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2628         /* serialized events go in the queue. We need to be certain that we
2629          * don't cause deadlocks waiting for the query return value. We check if
2630          * the queue is empty (nothing is blocking downstream and the query can
2631          * be pushed for sure) or we are not buffering. If we are buffering,
2632          * the pipeline waits to unblock downstream until our queue fills up
2633          * completely, which can not happen if we block on the query..
2634          * Therefore we only potentially block when we are not buffering. */
2635         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2636         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2637                 || !queue->use_buffering)) {
2638           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2639             gst_queue2_locked_enqueue (queue, query,
2640                 GST_QUEUE2_ITEM_TYPE_QUERY);
2641
2642             STATUS (queue, queue->sinkpad, "wait for QUERY");
2643             g_cond_wait (&queue->query_handled, &queue->qlock);
2644             if (queue->sinkresult != GST_FLOW_OK)
2645               goto out_flushing;
2646             res = queue->last_query;
2647           } else {
2648             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2649             res = FALSE;
2650           }
2651         } else {
2652           GST_DEBUG_OBJECT (queue,
2653               "refusing query, we are not using the queue");
2654           res = FALSE;
2655         }
2656         GST_QUEUE2_MUTEX_UNLOCK (queue);
2657         gst_queue2_post_buffering (queue);
2658       } else {
2659         res = gst_pad_query_default (pad, parent, query);
2660       }
2661       break;
2662   }
2663   return res;
2664
2665   /* ERRORS */
2666 out_flushing:
2667   {
2668     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2669     GST_QUEUE2_MUTEX_UNLOCK (queue);
2670     return FALSE;
2671   }
2672 }
2673
2674 static gboolean
2675 gst_queue2_is_empty (GstQueue2 * queue)
2676 {
2677   /* never empty on EOS */
2678   if (queue->is_eos)
2679     return FALSE;
2680
2681   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2682     return queue->current->writing_pos <= queue->current->max_reading_pos;
2683   } else {
2684     if (queue->queue.length == 0)
2685       return TRUE;
2686   }
2687
2688   return FALSE;
2689 }
2690
2691 static gboolean
2692 gst_queue2_is_filled (GstQueue2 * queue)
2693 {
2694   gboolean res;
2695
2696   /* always filled on EOS */
2697   if (queue->is_eos)
2698     return TRUE;
2699
2700 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2701     (queue->cur_level.format) >= ((alt_max) ? \
2702       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2703
2704   /* if using a ring buffer we're filled if all ring buffer space is used
2705    * _by the current range_ */
2706   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2707     guint64 rb_size = queue->ring_buffer_max_size;
2708     GST_DEBUG_OBJECT (queue,
2709         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2710         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2711     return CHECK_FILLED (bytes, rb_size);
2712   }
2713
2714   /* if using file, we're never filled if we don't have EOS */
2715   if (QUEUE_IS_USING_TEMP_FILE (queue))
2716     return FALSE;
2717
2718   /* we are never filled when we have no buffers at all */
2719   if (queue->cur_level.buffers == 0)
2720     return FALSE;
2721
2722   /* we are filled if one of the current levels exceeds the max */
2723   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2724       || CHECK_FILLED (time, 0);
2725
2726   /* if we need to, use the rate estimate to check against the max time we are
2727    * allowed to queue */
2728   if (queue->use_rate_estimate)
2729     res |= CHECK_FILLED (rate_time, 0);
2730
2731 #undef CHECK_FILLED
2732   return res;
2733 }
2734
2735 static GstFlowReturn
2736 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2737     GstMiniObject * item, GstQueue2ItemType item_type)
2738 {
2739   /* we have to lock the queue since we span threads */
2740   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2741   /* when we received EOS, we refuse more data */
2742   if (queue->is_eos)
2743     goto out_eos;
2744   /* when we received unexpected from downstream, refuse more buffers */
2745   if (queue->unexpected)
2746     goto out_unexpected;
2747
2748   /* while we didn't receive the newsegment, we're seeking and we skip data */
2749   if (queue->seeking)
2750     goto out_seeking;
2751
2752   if (!gst_queue2_wait_free_space (queue))
2753     goto out_flushing;
2754
2755   /* put buffer in queue now */
2756   gst_queue2_locked_enqueue (queue, item, item_type);
2757   GST_QUEUE2_MUTEX_UNLOCK (queue);
2758   gst_queue2_post_buffering (queue);
2759
2760   return GST_FLOW_OK;
2761
2762   /* special conditions */
2763 out_flushing:
2764   {
2765     GstFlowReturn ret = queue->sinkresult;
2766
2767     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2768         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2769     GST_QUEUE2_MUTEX_UNLOCK (queue);
2770     gst_mini_object_unref (item);
2771
2772     return ret;
2773   }
2774 out_eos:
2775   {
2776     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2777     GST_QUEUE2_MUTEX_UNLOCK (queue);
2778     gst_mini_object_unref (item);
2779
2780     return GST_FLOW_EOS;
2781   }
2782 out_seeking:
2783   {
2784     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2785     GST_QUEUE2_MUTEX_UNLOCK (queue);
2786     gst_mini_object_unref (item);
2787
2788     return GST_FLOW_OK;
2789   }
2790 out_unexpected:
2791   {
2792     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2793     GST_QUEUE2_MUTEX_UNLOCK (queue);
2794     gst_mini_object_unref (item);
2795
2796     return GST_FLOW_EOS;
2797   }
2798 }
2799
2800 static GstFlowReturn
2801 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2802 {
2803   GstQueue2 *queue;
2804
2805   queue = GST_QUEUE2 (parent);
2806
2807   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2808       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2809       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2810       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2811       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2812
2813   return gst_queue2_chain_buffer_or_buffer_list (queue,
2814       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2815 }
2816
2817 static GstFlowReturn
2818 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2819     GstBufferList * buffer_list)
2820 {
2821   GstQueue2 *queue;
2822
2823   queue = GST_QUEUE2 (parent);
2824
2825   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2826       "received buffer list %p", buffer_list);
2827
2828   return gst_queue2_chain_buffer_or_buffer_list (queue,
2829       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2830 }
2831
2832 static GstMiniObject *
2833 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2834 {
2835   GstMiniObject *data;
2836
2837   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2838
2839   /* stop pushing buffers, we dequeue all items until we see an item that we
2840    * can push again, which is EOS or SEGMENT. If there is nothing in the
2841    * queue we can push, we set a flag to make the sinkpad refuse more
2842    * buffers with an EOS return value until we receive something
2843    * pushable again or we get flushed. */
2844   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2845     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2846       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2847           "dropping EOS buffer %p", data);
2848       gst_buffer_unref (GST_BUFFER_CAST (data));
2849     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2850       GstEvent *event = GST_EVENT_CAST (data);
2851       GstEventType type = GST_EVENT_TYPE (event);
2852
2853       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2854         /* we found a pushable item in the queue, push it out */
2855         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2856             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2857         return data;
2858       }
2859       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2860           "dropping EOS event %p", event);
2861       gst_event_unref (event);
2862     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2863       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2864           "dropping EOS buffer list %p", data);
2865       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2866     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2867       queue->last_query = FALSE;
2868       g_cond_signal (&queue->query_handled);
2869       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2870     }
2871   }
2872   /* no more items in the queue. Set the unexpected flag so that upstream
2873    * make us refuse any more buffers on the sinkpad. Since we will still
2874    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2875    * task function does not shut down. */
2876   queue->unexpected = TRUE;
2877   return NULL;
2878 }
2879
2880 /* dequeue an item from the queue an push it downstream. This functions returns
2881  * the result of the push. */
2882 static GstFlowReturn
2883 gst_queue2_push_one (GstQueue2 * queue)
2884 {
2885   GstFlowReturn result = queue->srcresult;
2886   GstMiniObject *data;
2887   GstQueue2ItemType item_type;
2888
2889   data = gst_queue2_locked_dequeue (queue, &item_type);
2890   if (data == NULL)
2891     goto no_item;
2892
2893 next:
2894   STATUS (queue, queue->srcpad, "We have something dequeud");
2895   g_atomic_int_set (&queue->downstream_may_block,
2896       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2897       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2898   GST_QUEUE2_MUTEX_UNLOCK (queue);
2899   gst_queue2_post_buffering (queue);
2900
2901   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2902     GstBuffer *buffer;
2903
2904     buffer = GST_BUFFER_CAST (data);
2905
2906     result = gst_pad_push (queue->srcpad, buffer);
2907     g_atomic_int_set (&queue->downstream_may_block, 0);
2908
2909     /* need to check for srcresult here as well */
2910     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2911     if (result == GST_FLOW_EOS) {
2912       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2913       if (data != NULL)
2914         goto next;
2915       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2916        * to the caller so that the task function does not shut down */
2917       result = GST_FLOW_OK;
2918     }
2919   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2920     GstEvent *event = GST_EVENT_CAST (data);
2921     GstEventType type = GST_EVENT_TYPE (event);
2922
2923     if (type == GST_EVENT_TAG) {
2924       if (queue->use_tags_bitrate) {
2925         GstTagList *tags;
2926         guint bitrate;
2927
2928         gst_event_parse_tag (event, &tags);
2929         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2930             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2931           GST_QUEUE2_MUTEX_LOCK (queue);
2932           queue->src_tags_bitrate = bitrate;
2933           GST_QUEUE2_MUTEX_UNLOCK (queue);
2934           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
2935         }
2936       }
2937     }
2938
2939     gst_pad_push_event (queue->srcpad, event);
2940
2941     /* if we're EOS, return EOS so that the task pauses. */
2942     if (type == GST_EVENT_EOS) {
2943       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2944           "pushed EOS event %p, return EOS", event);
2945       result = GST_FLOW_EOS;
2946     }
2947
2948     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2949   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2950     GstBufferList *buffer_list;
2951
2952     buffer_list = GST_BUFFER_LIST_CAST (data);
2953
2954     result = gst_pad_push_list (queue->srcpad, buffer_list);
2955     g_atomic_int_set (&queue->downstream_may_block, 0);
2956
2957     /* need to check for srcresult here as well */
2958     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2959     if (result == GST_FLOW_EOS) {
2960       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2961       if (data != NULL)
2962         goto next;
2963       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2964        * to the caller so that the task function does not shut down */
2965       result = GST_FLOW_OK;
2966     }
2967   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2968     GstQuery *query = GST_QUERY_CAST (data);
2969
2970     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2971     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2972     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2973     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2974         "did query %p, return %d", query, queue->last_query);
2975     g_cond_signal (&queue->query_handled);
2976     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2977     result = GST_FLOW_OK;
2978   }
2979   return result;
2980
2981   /* ERRORS */
2982 no_item:
2983   {
2984     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2985         "exit because we have no item in the queue");
2986     return GST_FLOW_ERROR;
2987   }
2988 out_flushing:
2989   {
2990     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2991     return GST_FLOW_FLUSHING;
2992   }
2993 }
2994
2995 /* called repeatedly with @pad as the source pad. This function should push out
2996  * data to the peer element. */
2997 static void
2998 gst_queue2_loop (GstPad * pad)
2999 {
3000   GstQueue2 *queue;
3001   GstFlowReturn ret;
3002
3003   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
3004
3005   /* have to lock for thread-safety */
3006   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3007
3008   if (gst_queue2_is_empty (queue)) {
3009     gboolean started;
3010
3011     /* pause the timer while we wait. The fact that we are waiting does not mean
3012      * the byterate on the output pad is lower */
3013     if ((started = queue->out_timer_started))
3014       g_timer_stop (queue->out_timer);
3015
3016     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
3017         "queue is empty, waiting for new data");
3018     do {
3019       /* Wait for data to be available, we could be unlocked because of a flush. */
3020       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
3021     }
3022     while (gst_queue2_is_empty (queue));
3023
3024     /* and continue if we were running before */
3025     if (started)
3026       g_timer_continue (queue->out_timer);
3027   }
3028   ret = gst_queue2_push_one (queue);
3029   queue->srcresult = ret;
3030   queue->sinkresult = ret;
3031   if (ret != GST_FLOW_OK)
3032     goto out_flushing;
3033
3034   GST_QUEUE2_MUTEX_UNLOCK (queue);
3035   gst_queue2_post_buffering (queue);
3036
3037   return;
3038
3039   /* ERRORS */
3040 out_flushing:
3041   {
3042     gboolean eos = queue->is_eos;
3043     GstFlowReturn ret = queue->srcresult;
3044
3045     gst_pad_pause_task (queue->srcpad);
3046     if (ret == GST_FLOW_FLUSHING) {
3047       gst_queue2_locked_flush (queue, FALSE, FALSE);
3048     } else {
3049       GST_QUEUE2_SIGNAL_DEL (queue);
3050       queue->last_query = FALSE;
3051       g_cond_signal (&queue->query_handled);
3052     }
3053     GST_QUEUE2_MUTEX_UNLOCK (queue);
3054     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3055         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
3056     /* let app know about us giving up if upstream is not expected to do so */
3057     /* EOS is already taken care of elsewhere */
3058     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3059       GST_ELEMENT_FLOW_ERROR (queue, ret);
3060       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3061     }
3062     return;
3063   }
3064 }
3065
3066 static gboolean
3067 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3068 {
3069   gboolean res = TRUE;
3070   GstQueue2 *queue = GST_QUEUE2 (parent);
3071
3072 #ifndef GST_DISABLE_GST_DEBUG
3073   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3074       event, GST_EVENT_TYPE_NAME (event));
3075 #endif
3076
3077   switch (GST_EVENT_TYPE (event)) {
3078     case GST_EVENT_FLUSH_START:
3079       if (QUEUE_IS_USING_QUEUE (queue)) {
3080         /* just forward upstream */
3081         res = gst_pad_push_event (queue->sinkpad, event);
3082       } else {
3083         /* now unblock the getrange function */
3084         GST_QUEUE2_MUTEX_LOCK (queue);
3085         GST_DEBUG_OBJECT (queue, "flushing");
3086         queue->srcresult = GST_FLOW_FLUSHING;
3087         GST_QUEUE2_SIGNAL_ADD (queue);
3088         GST_QUEUE2_MUTEX_UNLOCK (queue);
3089
3090         /* when using a temp file, we eat the event */
3091         res = TRUE;
3092         gst_event_unref (event);
3093       }
3094       break;
3095     case GST_EVENT_FLUSH_STOP:
3096       if (QUEUE_IS_USING_QUEUE (queue)) {
3097         /* just forward upstream */
3098         res = gst_pad_push_event (queue->sinkpad, event);
3099       } else {
3100         /* now unblock the getrange function */
3101         GST_QUEUE2_MUTEX_LOCK (queue);
3102         queue->srcresult = GST_FLOW_OK;
3103         GST_QUEUE2_MUTEX_UNLOCK (queue);
3104
3105         /* when using a temp file, we eat the event */
3106         res = TRUE;
3107         gst_event_unref (event);
3108       }
3109       break;
3110     case GST_EVENT_RECONFIGURE:
3111       GST_QUEUE2_MUTEX_LOCK (queue);
3112       /* assume downstream is linked now and try to push again */
3113       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3114         queue->srcresult = GST_FLOW_OK;
3115         queue->sinkresult = GST_FLOW_OK;
3116         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3117           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3118               NULL);
3119         }
3120       }
3121       GST_QUEUE2_MUTEX_UNLOCK (queue);
3122
3123       res = gst_pad_push_event (queue->sinkpad, event);
3124       break;
3125     default:
3126       res = gst_pad_push_event (queue->sinkpad, event);
3127       break;
3128   }
3129
3130   return res;
3131 }
3132
3133 static gboolean
3134 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3135 {
3136   GstQueue2 *queue;
3137
3138   queue = GST_QUEUE2 (parent);
3139
3140   switch (GST_QUERY_TYPE (query)) {
3141     case GST_QUERY_POSITION:
3142     {
3143       gint64 peer_pos;
3144       GstFormat format;
3145
3146       if (!gst_pad_peer_query (queue->sinkpad, query))
3147         goto peer_failed;
3148
3149       /* get peer position */
3150       gst_query_parse_position (query, &format, &peer_pos);
3151
3152       /* FIXME: this code assumes that there's no discont in the queue */
3153       switch (format) {
3154         case GST_FORMAT_BYTES:
3155           peer_pos -= queue->cur_level.bytes;
3156           break;
3157         case GST_FORMAT_TIME:
3158           peer_pos -= queue->cur_level.time;
3159           break;
3160         default:
3161           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3162               "know how to adjust value", gst_format_get_name (format));
3163           return FALSE;
3164       }
3165       /* set updated position */
3166       gst_query_set_position (query, format, peer_pos);
3167       break;
3168     }
3169     case GST_QUERY_DURATION:
3170     {
3171       GST_DEBUG_OBJECT (queue, "doing peer query");
3172
3173       if (!gst_pad_peer_query (queue->sinkpad, query))
3174         goto peer_failed;
3175
3176       GST_DEBUG_OBJECT (queue, "peer query success");
3177       break;
3178     }
3179     case GST_QUERY_BUFFERING:
3180     {
3181       gint percent;
3182       gboolean is_buffering;
3183       GstBufferingMode mode;
3184       gint avg_in, avg_out;
3185       gint64 buffering_left;
3186
3187       GST_DEBUG_OBJECT (queue, "query buffering");
3188
3189       get_buffering_level (queue, &is_buffering, &percent);
3190       percent = convert_to_buffering_percent (queue, percent);
3191       gst_query_set_buffering_percent (query, is_buffering, percent);
3192
3193       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3194           &buffering_left);
3195       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3196           buffering_left);
3197
3198       if (!QUEUE_IS_USING_QUEUE (queue)) {
3199         /* add ranges for download and ringbuffer buffering */
3200         GstFormat format;
3201         gint64 start, stop, range_start, range_stop;
3202         guint64 writing_pos;
3203         gint64 estimated_total;
3204         gint64 duration;
3205         gboolean peer_res, is_eos;
3206         GstQueue2Range *queued_ranges;
3207
3208         /* we need a current download region */
3209         if (queue->current == NULL)
3210           return FALSE;
3211
3212         writing_pos = queue->current->writing_pos;
3213         is_eos = queue->is_eos;
3214
3215         if (is_eos) {
3216           /* we're EOS, we know the duration in bytes now */
3217           peer_res = TRUE;
3218           duration = writing_pos;
3219         } else {
3220           /* get duration of upstream in bytes */
3221           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3222               GST_FORMAT_BYTES, &duration);
3223         }
3224
3225         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3226             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3227
3228         /* calculate remaining and total download time */
3229         if (peer_res && avg_in > 0.0)
3230           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3231         else
3232           estimated_total = -1;
3233
3234         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3235             estimated_total);
3236
3237         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3238
3239         switch (format) {
3240           case GST_FORMAT_PERCENT:
3241             /* we need duration */
3242             if (!peer_res)
3243               goto peer_failed;
3244
3245             start = 0;
3246             /* get our available data relative to the duration */
3247             if (duration != -1)
3248               stop =
3249                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3250                   duration);
3251             else
3252               stop = -1;
3253             break;
3254           case GST_FORMAT_BYTES:
3255             start = 0;
3256             stop = writing_pos;
3257             break;
3258           default:
3259             start = -1;
3260             stop = -1;
3261             break;
3262         }
3263
3264         /* fill out the buffered ranges */
3265         for (queued_ranges = queue->ranges; queued_ranges;
3266             queued_ranges = queued_ranges->next) {
3267           switch (format) {
3268             case GST_FORMAT_PERCENT:
3269               if (duration == -1) {
3270                 range_start = 0;
3271                 range_stop = 0;
3272                 break;
3273               }
3274               range_start =
3275                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3276                   queued_ranges->offset, duration);
3277               range_stop =
3278                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3279                   queued_ranges->writing_pos, duration);
3280               break;
3281             case GST_FORMAT_BYTES:
3282               range_start = queued_ranges->offset;
3283               range_stop = queued_ranges->writing_pos;
3284               break;
3285             default:
3286               range_start = -1;
3287               range_stop = -1;
3288               break;
3289           }
3290           if (range_start == range_stop)
3291             continue;
3292           GST_DEBUG_OBJECT (queue,
3293               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3294               G_GINT64_FORMAT, range_start, range_stop);
3295           gst_query_add_buffering_range (query, range_start, range_stop);
3296         }
3297
3298         gst_query_set_buffering_range (query, format, start, stop,
3299             estimated_total);
3300       }
3301       break;
3302     }
3303     case GST_QUERY_SCHEDULING:
3304     {
3305       gboolean pull_mode;
3306       GstSchedulingFlags flags = 0;
3307
3308       if (!gst_pad_peer_query (queue->sinkpad, query))
3309         goto peer_failed;
3310
3311       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3312
3313       /* we can operate in pull mode when we are using a tempfile */
3314       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3315
3316       if (pull_mode)
3317         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3318       gst_query_set_scheduling (query, flags, 0, -1, 0);
3319       if (pull_mode)
3320         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3321       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3322       break;
3323     }
3324     default:
3325       /* peer handled other queries */
3326       if (!gst_pad_query_default (pad, parent, query))
3327         goto peer_failed;
3328       break;
3329   }
3330
3331   return TRUE;
3332
3333   /* ERRORS */
3334 peer_failed:
3335   {
3336     GST_DEBUG_OBJECT (queue, "failed peer query");
3337     return FALSE;
3338   }
3339 }
3340
3341 static gboolean
3342 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3343 {
3344   GstQueue2 *queue = GST_QUEUE2 (element);
3345
3346   /* simply forward to the srcpad query function */
3347   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3348       query);
3349 }
3350
3351 static void
3352 gst_queue2_update_upstream_size (GstQueue2 * queue)
3353 {
3354   gint64 upstream_size = -1;
3355
3356   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3357           &upstream_size)) {
3358     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3359
3360     /* upstream_size can be negative but queue->upstream_size is unsigned.
3361      * Prevent setting negative values to it (the query can return -1) */
3362     if (upstream_size >= 0)
3363       queue->upstream_size = upstream_size;
3364     else
3365       queue->upstream_size = 0;
3366   }
3367 }
3368
3369 static GstFlowReturn
3370 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3371     guint length, GstBuffer ** buffer)
3372 {
3373   GstQueue2 *queue;
3374   GstFlowReturn ret;
3375
3376   queue = GST_QUEUE2_CAST (parent);
3377
3378   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3379   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3380   offset = (offset == -1) ? queue->current->reading_pos : offset;
3381
3382   GST_DEBUG_OBJECT (queue,
3383       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3384
3385   /* catch any reads beyond the size of the file here to make sure queue2
3386    * doesn't send seek events beyond the size of the file upstream, since
3387    * that would confuse elements such as souphttpsrc and/or http servers.
3388    * Demuxers often just loop until EOS at the end of the file to figure out
3389    * when they've read all the end-headers or index chunks. */
3390   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3391     gst_queue2_update_upstream_size (queue);
3392     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3393       goto out_unexpected;
3394   }
3395
3396   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3397     gst_queue2_update_upstream_size (queue);
3398     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3399       length = queue->upstream_size - offset;
3400       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3401     }
3402   }
3403
3404   /* FIXME - function will block when the range is not yet available */
3405   ret = gst_queue2_create_read (queue, offset, length, buffer);
3406   GST_QUEUE2_MUTEX_UNLOCK (queue);
3407   gst_queue2_post_buffering (queue);
3408
3409   return ret;
3410
3411   /* ERRORS */
3412 out_flushing:
3413   {
3414     ret = queue->srcresult;
3415
3416     GST_DEBUG_OBJECT (queue, "we are flushing");
3417     GST_QUEUE2_MUTEX_UNLOCK (queue);
3418     return ret;
3419   }
3420 out_unexpected:
3421   {
3422     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3423     GST_QUEUE2_MUTEX_UNLOCK (queue);
3424     return GST_FLOW_EOS;
3425   }
3426 }
3427
3428 /* sink currently only operates in push mode */
3429 static gboolean
3430 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3431     GstPadMode mode, gboolean active)
3432 {
3433   gboolean result;
3434   GstQueue2 *queue;
3435
3436   queue = GST_QUEUE2 (parent);
3437
3438   switch (mode) {
3439     case GST_PAD_MODE_PUSH:
3440       if (active) {
3441         GST_QUEUE2_MUTEX_LOCK (queue);
3442         GST_DEBUG_OBJECT (queue, "activating push mode");
3443         queue->srcresult = GST_FLOW_OK;
3444         queue->sinkresult = GST_FLOW_OK;
3445         queue->is_eos = FALSE;
3446         queue->unexpected = FALSE;
3447         reset_rate_timer (queue);
3448         GST_QUEUE2_MUTEX_UNLOCK (queue);
3449       } else {
3450         /* unblock chain function */
3451         GST_QUEUE2_MUTEX_LOCK (queue);
3452         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3453         queue->srcresult = GST_FLOW_FLUSHING;
3454         queue->sinkresult = GST_FLOW_FLUSHING;
3455         GST_QUEUE2_SIGNAL_DEL (queue);
3456         GST_QUEUE2_MUTEX_UNLOCK (queue);
3457
3458         /* wait until it is unblocked and clean up */
3459         GST_PAD_STREAM_LOCK (pad);
3460         GST_QUEUE2_MUTEX_LOCK (queue);
3461         gst_queue2_locked_flush (queue, TRUE, FALSE);
3462         GST_QUEUE2_MUTEX_UNLOCK (queue);
3463         GST_PAD_STREAM_UNLOCK (pad);
3464       }
3465       result = TRUE;
3466       break;
3467     default:
3468       result = FALSE;
3469       break;
3470   }
3471   return result;
3472 }
3473
3474 /* src operating in push mode, we start a task on the source pad that pushes out
3475  * buffers from the queue */
3476 static gboolean
3477 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3478 {
3479   gboolean result = FALSE;
3480   GstQueue2 *queue;
3481
3482   queue = GST_QUEUE2 (parent);
3483
3484   if (active) {
3485     GST_QUEUE2_MUTEX_LOCK (queue);
3486     GST_DEBUG_OBJECT (queue, "activating push mode");
3487     queue->srcresult = GST_FLOW_OK;
3488     queue->sinkresult = GST_FLOW_OK;
3489     queue->is_eos = FALSE;
3490     queue->unexpected = FALSE;
3491     result =
3492         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3493     GST_QUEUE2_MUTEX_UNLOCK (queue);
3494   } else {
3495     /* unblock loop function */
3496     GST_QUEUE2_MUTEX_LOCK (queue);
3497     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3498     queue->srcresult = GST_FLOW_FLUSHING;
3499     queue->sinkresult = GST_FLOW_FLUSHING;
3500     /* the item add signal will unblock */
3501     GST_QUEUE2_SIGNAL_ADD (queue);
3502     GST_QUEUE2_MUTEX_UNLOCK (queue);
3503
3504     /* step 2, make sure streaming finishes */
3505     result = gst_pad_stop_task (pad);
3506   }
3507
3508   return result;
3509 }
3510
3511 /* pull mode, downstream will call our getrange function */
3512 static gboolean
3513 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3514 {
3515   gboolean result;
3516   GstQueue2 *queue;
3517
3518   queue = GST_QUEUE2 (parent);
3519
3520   if (active) {
3521     GST_QUEUE2_MUTEX_LOCK (queue);
3522     if (!QUEUE_IS_USING_QUEUE (queue)) {
3523       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3524         /* open the temp file now */
3525         result = gst_queue2_open_temp_location_file (queue);
3526       } else if (!queue->ring_buffer) {
3527         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3528         result = ! !queue->ring_buffer;
3529       } else {
3530         result = TRUE;
3531       }
3532
3533       GST_DEBUG_OBJECT (queue, "activating pull mode");
3534       init_ranges (queue);
3535       queue->srcresult = GST_FLOW_OK;
3536       queue->sinkresult = GST_FLOW_OK;
3537       queue->is_eos = FALSE;
3538       queue->unexpected = FALSE;
3539       queue->upstream_size = 0;
3540     } else {
3541       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3542       /* this is not allowed, we cannot operate in pull mode without a temp
3543        * file. */
3544       queue->srcresult = GST_FLOW_FLUSHING;
3545       queue->sinkresult = GST_FLOW_FLUSHING;
3546       result = FALSE;
3547     }
3548     GST_QUEUE2_MUTEX_UNLOCK (queue);
3549   } else {
3550     GST_QUEUE2_MUTEX_LOCK (queue);
3551     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3552     queue->srcresult = GST_FLOW_FLUSHING;
3553     queue->sinkresult = GST_FLOW_FLUSHING;
3554     /* this will unlock getrange */
3555     GST_QUEUE2_SIGNAL_ADD (queue);
3556     result = TRUE;
3557     GST_QUEUE2_MUTEX_UNLOCK (queue);
3558   }
3559
3560   return result;
3561 }
3562
3563 static gboolean
3564 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3565     gboolean active)
3566 {
3567   gboolean res;
3568
3569   switch (mode) {
3570     case GST_PAD_MODE_PULL:
3571       res = gst_queue2_src_activate_pull (pad, parent, active);
3572       break;
3573     case GST_PAD_MODE_PUSH:
3574       res = gst_queue2_src_activate_push (pad, parent, active);
3575       break;
3576     default:
3577       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3578       res = FALSE;
3579       break;
3580   }
3581   return res;
3582 }
3583
3584 static GstStateChangeReturn
3585 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3586 {
3587   GstQueue2 *queue;
3588   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3589
3590   queue = GST_QUEUE2 (element);
3591
3592   switch (transition) {
3593     case GST_STATE_CHANGE_NULL_TO_READY:
3594       break;
3595     case GST_STATE_CHANGE_READY_TO_PAUSED:
3596       GST_QUEUE2_MUTEX_LOCK (queue);
3597       if (!QUEUE_IS_USING_QUEUE (queue)) {
3598         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3599           if (!gst_queue2_open_temp_location_file (queue))
3600             ret = GST_STATE_CHANGE_FAILURE;
3601         } else {
3602           if (queue->ring_buffer) {
3603             g_free (queue->ring_buffer);
3604             queue->ring_buffer = NULL;
3605           }
3606           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3607             ret = GST_STATE_CHANGE_FAILURE;
3608         }
3609         init_ranges (queue);
3610       }
3611       queue->segment_event_received = FALSE;
3612       queue->starting_segment = NULL;
3613       gst_event_replace (&queue->stream_start_event, NULL);
3614       GST_QUEUE2_MUTEX_UNLOCK (queue);
3615       break;
3616     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3617       break;
3618     default:
3619       break;
3620   }
3621
3622   if (ret == GST_STATE_CHANGE_FAILURE)
3623     return ret;
3624
3625   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3626
3627   if (ret == GST_STATE_CHANGE_FAILURE)
3628     return ret;
3629
3630   switch (transition) {
3631     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3632       break;
3633     case GST_STATE_CHANGE_PAUSED_TO_READY:
3634       GST_QUEUE2_MUTEX_LOCK (queue);
3635       if (!QUEUE_IS_USING_QUEUE (queue)) {
3636         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3637           gst_queue2_close_temp_location_file (queue);
3638         } else if (queue->ring_buffer) {
3639           g_free (queue->ring_buffer);
3640           queue->ring_buffer = NULL;
3641         }
3642         clean_ranges (queue);
3643       }
3644       if (queue->starting_segment != NULL) {
3645         gst_event_unref (queue->starting_segment);
3646         queue->starting_segment = NULL;
3647       }
3648       gst_event_replace (&queue->stream_start_event, NULL);
3649       GST_QUEUE2_MUTEX_UNLOCK (queue);
3650       break;
3651     case GST_STATE_CHANGE_READY_TO_NULL:
3652       break;
3653     default:
3654       break;
3655   }
3656
3657   return ret;
3658 }
3659
3660 /* changing the capacity of the queue must wake up
3661  * the _chain function, it might have more room now
3662  * to store the buffer/event in the queue */
3663 #define QUEUE_CAPACITY_CHANGE(q) \
3664   GST_QUEUE2_SIGNAL_DEL (queue); \
3665   if (queue->use_buffering)      \
3666     update_buffering (queue);
3667
3668 /* Changing the minimum required fill level must
3669  * wake up the _loop function as it might now
3670  * be able to preceed.
3671  */
3672 #define QUEUE_THRESHOLD_CHANGE(q)\
3673   GST_QUEUE2_SIGNAL_ADD (queue);
3674
3675 static void
3676 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3677 {
3678   GstState state;
3679
3680   /* the element must be stopped in order to do this */
3681   GST_OBJECT_LOCK (queue);
3682   state = GST_STATE (queue);
3683   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3684     goto wrong_state;
3685   GST_OBJECT_UNLOCK (queue);
3686
3687   /* set new location */
3688   g_free (queue->temp_template);
3689   queue->temp_template = g_strdup (template);
3690
3691   return;
3692
3693 /* ERROR */
3694 wrong_state:
3695   {
3696     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3697     GST_OBJECT_UNLOCK (queue);
3698   }
3699 }
3700
3701 static void
3702 gst_queue2_set_property (GObject * object,
3703     guint prop_id, const GValue * value, GParamSpec * pspec)
3704 {
3705   GstQueue2 *queue = GST_QUEUE2 (object);
3706
3707   /* someone could change levels here, and since this
3708    * affects the get/put funcs, we need to lock for safety. */
3709   GST_QUEUE2_MUTEX_LOCK (queue);
3710
3711   switch (prop_id) {
3712     case PROP_MAX_SIZE_BYTES:
3713       queue->max_level.bytes = g_value_get_uint (value);
3714       QUEUE_CAPACITY_CHANGE (queue);
3715       break;
3716     case PROP_MAX_SIZE_BUFFERS:
3717       queue->max_level.buffers = g_value_get_uint (value);
3718       QUEUE_CAPACITY_CHANGE (queue);
3719       break;
3720     case PROP_MAX_SIZE_TIME:
3721       queue->max_level.time = g_value_get_uint64 (value);
3722       /* set rate_time to the same value. We use an extra field in the level
3723        * structure so that we can easily access and compare it */
3724       queue->max_level.rate_time = queue->max_level.time;
3725       QUEUE_CAPACITY_CHANGE (queue);
3726       break;
3727     case PROP_USE_BUFFERING:
3728       queue->use_buffering = g_value_get_boolean (value);
3729       if (!queue->use_buffering && queue->is_buffering) {
3730         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3731             "posting 100%% message");
3732         SET_PERCENT (queue, 100);
3733         queue->is_buffering = FALSE;
3734       }
3735
3736       if (queue->use_buffering) {
3737         queue->is_buffering = TRUE;
3738         update_buffering (queue);
3739       }
3740       break;
3741     case PROP_USE_TAGS_BITRATE:
3742       queue->use_tags_bitrate = g_value_get_boolean (value);
3743       break;
3744     case PROP_USE_RATE_ESTIMATE:
3745       queue->use_rate_estimate = g_value_get_boolean (value);
3746       break;
3747     case PROP_LOW_PERCENT:
3748       queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3749       if (queue->is_buffering)
3750         update_buffering (queue);
3751       break;
3752     case PROP_HIGH_PERCENT:
3753       queue->high_watermark =
3754           g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3755       if (queue->is_buffering)
3756         update_buffering (queue);
3757       break;
3758     case PROP_LOW_WATERMARK:
3759       queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3760       if (queue->is_buffering)
3761         update_buffering (queue);
3762       break;
3763     case PROP_HIGH_WATERMARK:
3764       queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3765       if (queue->is_buffering)
3766         update_buffering (queue);
3767       break;
3768     case PROP_TEMP_TEMPLATE:
3769       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3770       break;
3771     case PROP_TEMP_REMOVE:
3772       queue->temp_remove = g_value_get_boolean (value);
3773       break;
3774     case PROP_RING_BUFFER_MAX_SIZE:
3775       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3776       break;
3777     default:
3778       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3779       break;
3780   }
3781
3782   GST_QUEUE2_MUTEX_UNLOCK (queue);
3783   gst_queue2_post_buffering (queue);
3784 }
3785
3786 static void
3787 gst_queue2_get_property (GObject * object,
3788     guint prop_id, GValue * value, GParamSpec * pspec)
3789 {
3790   GstQueue2 *queue = GST_QUEUE2 (object);
3791
3792   GST_QUEUE2_MUTEX_LOCK (queue);
3793
3794   switch (prop_id) {
3795     case PROP_CUR_LEVEL_BYTES:
3796       g_value_set_uint (value, queue->cur_level.bytes);
3797       break;
3798     case PROP_CUR_LEVEL_BUFFERS:
3799       g_value_set_uint (value, queue->cur_level.buffers);
3800       break;
3801     case PROP_CUR_LEVEL_TIME:
3802       g_value_set_uint64 (value, queue->cur_level.time);
3803       break;
3804     case PROP_MAX_SIZE_BYTES:
3805       g_value_set_uint (value, queue->max_level.bytes);
3806       break;
3807     case PROP_MAX_SIZE_BUFFERS:
3808       g_value_set_uint (value, queue->max_level.buffers);
3809       break;
3810     case PROP_MAX_SIZE_TIME:
3811       g_value_set_uint64 (value, queue->max_level.time);
3812       break;
3813     case PROP_USE_BUFFERING:
3814       g_value_set_boolean (value, queue->use_buffering);
3815       break;
3816     case PROP_USE_TAGS_BITRATE:
3817       g_value_set_boolean (value, queue->use_tags_bitrate);
3818       break;
3819     case PROP_USE_RATE_ESTIMATE:
3820       g_value_set_boolean (value, queue->use_rate_estimate);
3821       break;
3822     case PROP_LOW_PERCENT:
3823       g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
3824       break;
3825     case PROP_HIGH_PERCENT:
3826       g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
3827       break;
3828     case PROP_LOW_WATERMARK:
3829       g_value_set_double (value, queue->low_watermark /
3830           (gdouble) MAX_BUFFERING_LEVEL);
3831       break;
3832     case PROP_HIGH_WATERMARK:
3833       g_value_set_double (value, queue->high_watermark /
3834           (gdouble) MAX_BUFFERING_LEVEL);
3835       break;
3836     case PROP_TEMP_TEMPLATE:
3837       g_value_set_string (value, queue->temp_template);
3838       break;
3839     case PROP_TEMP_LOCATION:
3840       g_value_set_string (value, queue->temp_location);
3841       break;
3842     case PROP_TEMP_REMOVE:
3843       g_value_set_boolean (value, queue->temp_remove);
3844       break;
3845     case PROP_RING_BUFFER_MAX_SIZE:
3846       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3847       break;
3848     case PROP_AVG_IN_RATE:
3849     {
3850       gdouble in_rate = queue->byte_in_rate;
3851
3852       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3853        * calculated, so calculate it here. */
3854       if (in_rate == 0.0 && queue->bytes_in
3855           && queue->last_update_in_rates_elapsed > 0.0)
3856         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3857
3858       g_value_set_int64 (value, (gint64) in_rate);
3859       break;
3860     }
3861     default:
3862       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3863       break;
3864   }
3865
3866   GST_QUEUE2_MUTEX_UNLOCK (queue);
3867 }