queue2: Don't report 0% unless empty
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
77     GST_PAD_SINK,
78     GST_PAD_ALWAYS,
79     GST_STATIC_CAPS_ANY);
80
81 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
82     GST_PAD_SRC,
83     GST_PAD_ALWAYS,
84     GST_STATIC_CAPS_ANY);
85
86 GST_DEBUG_CATEGORY_STATIC (queue_debug);
87 #define GST_CAT_DEFAULT (queue_debug)
88 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
89
90 enum
91 {
92   SIGNAL_OVERRUN,
93   LAST_SIGNAL
94 };
95
96 /* other defines */
97 #define DEFAULT_BUFFER_SIZE 4096
98 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
99 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
100 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
101
102 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
103
104 /* default property values */
105 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
106 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
107 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
108 #define DEFAULT_USE_BUFFERING      FALSE
109 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
110 #define DEFAULT_LOW_PERCENT        10
111 #define DEFAULT_HIGH_PERCENT       99
112 #define DEFAULT_TEMP_REMOVE        TRUE
113 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
114
115 enum
116 {
117   PROP_0,
118   PROP_CUR_LEVEL_BUFFERS,
119   PROP_CUR_LEVEL_BYTES,
120   PROP_CUR_LEVEL_TIME,
121   PROP_MAX_SIZE_BUFFERS,
122   PROP_MAX_SIZE_BYTES,
123   PROP_MAX_SIZE_TIME,
124   PROP_USE_BUFFERING,
125   PROP_USE_RATE_ESTIMATE,
126   PROP_LOW_PERCENT,
127   PROP_HIGH_PERCENT,
128   PROP_TEMP_TEMPLATE,
129   PROP_TEMP_LOCATION,
130   PROP_TEMP_REMOVE,
131   PROP_RING_BUFFER_MAX_SIZE,
132   PROP_AVG_IN_RATE,
133   PROP_LAST
134 };
135
136 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
137   l.buffers = 0;                                        \
138   l.bytes = 0;                                          \
139   l.time = 0;                                           \
140   l.rate_time = 0;                                      \
141 } G_STMT_END
142
143 #define STATUS(queue, pad, msg) \
144   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
145                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
146                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
147                       " ns, %"G_GUINT64_FORMAT" items", \
148                       GST_DEBUG_PAD_NAME (pad), \
149                       queue->cur_level.buffers, \
150                       queue->max_level.buffers, \
151                       queue->cur_level.bytes, \
152                       queue->max_level.bytes, \
153                       queue->cur_level.time, \
154                       queue->max_level.time, \
155                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
156                         queue->current->writing_pos - queue->current->max_reading_pos : \
157                         queue->queue.length))
158
159 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
160   g_mutex_lock (&q->qlock);                                              \
161 } G_STMT_END
162
163 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
164   GST_QUEUE2_MUTEX_LOCK (q);                                            \
165   if (res != GST_FLOW_OK)                                               \
166     goto label;                                                         \
167 } G_STMT_END
168
169 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
170   g_mutex_unlock (&q->qlock);                                            \
171 } G_STMT_END
172
173 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
174   STATUS (queue, q->sinkpad, "wait for DEL");                           \
175   q->waiting_del = TRUE;                                                \
176   g_cond_wait (&q->item_del, &queue->qlock);                              \
177   q->waiting_del = FALSE;                                               \
178   if (res != GST_FLOW_OK) {                                             \
179     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
180     goto label;                                                         \
181   }                                                                     \
182   STATUS (queue, q->sinkpad, "received DEL");                           \
183 } G_STMT_END
184
185 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
186   STATUS (queue, q->srcpad, "wait for ADD");                            \
187   q->waiting_add = TRUE;                                                \
188   g_cond_wait (&q->item_add, &q->qlock);                                  \
189   q->waiting_add = FALSE;                                               \
190   if (res != GST_FLOW_OK) {                                             \
191     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
192     goto label;                                                         \
193   }                                                                     \
194   STATUS (queue, q->srcpad, "received ADD");                            \
195 } G_STMT_END
196
197 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
198   if (q->waiting_del) {                                                 \
199     STATUS (q, q->srcpad, "signal DEL");                                \
200     g_cond_signal (&q->item_del);                                        \
201   }                                                                     \
202 } G_STMT_END
203
204 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
205   if (q->waiting_add) {                                                 \
206     STATUS (q, q->sinkpad, "signal ADD");                               \
207     g_cond_signal (&q->item_add);                                        \
208   }                                                                     \
209 } G_STMT_END
210
211 #define SET_PERCENT(q, perc) G_STMT_START {                              \
212   if (perc != q->buffering_percent) {                                    \
213     q->buffering_percent = perc;                                         \
214     q->percent_changed = TRUE;                                           \
215     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
216     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
217         &q->buffering_left);                                             \
218   }                                                                      \
219 } G_STMT_END
220
221 #define _do_init \
222     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
223     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
224         "dataflow inside the queue element");
225 #define gst_queue2_parent_class parent_class
226 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
227
228 static void gst_queue2_finalize (GObject * object);
229
230 static void gst_queue2_set_property (GObject * object,
231     guint prop_id, const GValue * value, GParamSpec * pspec);
232 static void gst_queue2_get_property (GObject * object,
233     guint prop_id, GValue * value, GParamSpec * pspec);
234
235 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
236     GstBuffer * buffer);
237 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
238     GstBufferList * buffer_list);
239 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
240 static void gst_queue2_loop (GstPad * pad);
241
242 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
243     GstEvent * event);
244 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
245     GstQuery * query);
246
247 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
248     GstEvent * event);
249 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
250     GstQuery * query);
251 static gboolean gst_queue2_handle_query (GstElement * element,
252     GstQuery * query);
253
254 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
255     guint64 offset, guint length, GstBuffer ** buffer);
256
257 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
258     GstPadMode mode, gboolean active);
259 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
260     GstPadMode mode, gboolean active);
261 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
262     GstStateChange transition);
263
264 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
265 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
266
267 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
268 static void update_in_rates (GstQueue2 * queue);
269 static void gst_queue2_post_buffering (GstQueue2 * queue);
270
271 typedef enum
272 {
273   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
274   GST_QUEUE2_ITEM_TYPE_BUFFER,
275   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
276   GST_QUEUE2_ITEM_TYPE_EVENT,
277   GST_QUEUE2_ITEM_TYPE_QUERY
278 } GstQueue2ItemType;
279
280 typedef struct
281 {
282   GstQueue2ItemType type;
283   GstMiniObject *item;
284 } GstQueue2Item;
285
286 static guint gst_queue2_signals[LAST_SIGNAL] = { 0 };
287
288 static void
289 gst_queue2_class_init (GstQueue2Class * klass)
290 {
291   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
292   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
293
294   gobject_class->set_property = gst_queue2_set_property;
295   gobject_class->get_property = gst_queue2_get_property;
296
297   /* signals */
298   /**
299    * GstQueue2::overrun:
300    * @queue: the queue2 instance
301    *
302    * Reports that the buffer became full (overrun).
303    * A buffer is full if the total amount of data inside it (num-buffers, time,
304    * size) is higher than the boundary values which can be set through the
305    * GObject properties.
306    *
307    * Since: 1.8
308    */
309   gst_queue2_signals[SIGNAL_OVERRUN] =
310       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
311       G_STRUCT_OFFSET (GstQueue2Class, overrun), NULL, NULL,
312       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
313
314   /* properties */
315   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
316       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
317           "Current amount of data in the queue (bytes)",
318           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
319   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
320       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
321           "Current number of buffers in the queue",
322           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
323   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
324       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
325           "Current amount of data in the queue (in ns)",
326           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
327
328   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
329       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
330           "Max. amount of data in the queue (bytes, 0=disable)",
331           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
332           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
333           G_PARAM_STATIC_STRINGS));
334   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
335       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
336           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
337           DEFAULT_MAX_SIZE_BUFFERS,
338           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
339           G_PARAM_STATIC_STRINGS));
340   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
341       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
342           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
343           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
344           G_PARAM_STATIC_STRINGS));
345
346   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
347       g_param_spec_boolean ("use-buffering", "Use buffering",
348           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
349           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
350           G_PARAM_STATIC_STRINGS));
351   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
352       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
353           "Estimate the bitrate of the stream to calculate time level",
354           DEFAULT_USE_RATE_ESTIMATE,
355           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
357       g_param_spec_int ("low-percent", "Low percent",
358           "Low threshold for buffering to start. Only used if use-buffering is True",
359           0, 100, DEFAULT_LOW_PERCENT,
360           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
362       g_param_spec_int ("high-percent", "High percent",
363           "High threshold for buffering to finish. Only used if use-buffering is True",
364           0, 100, DEFAULT_HIGH_PERCENT,
365           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366
367   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
368       g_param_spec_string ("temp-template", "Temporary File Template",
369           "File template to store temporary files in, should contain directory "
370           "and XXXXXX. (NULL == disabled)",
371           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372
373   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
374       g_param_spec_string ("temp-location", "Temporary File Location",
375           "Location to store temporary files in (Only read this property, "
376           "use temp-template to configure the name template)",
377           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
378
379   /**
380    * GstQueue2:temp-remove
381    *
382    * When temp-template is set, remove the temporary file when going to READY.
383    */
384   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
385       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
386           "Remove the temp-location after use",
387           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388
389   /**
390    * GstQueue2:ring-buffer-max-size
391    *
392    * The maximum size of the ring buffer in bytes. If set to 0, the ring
393    * buffer is disabled. Default 0.
394    */
395   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
396       g_param_spec_uint64 ("ring-buffer-max-size",
397           "Max. ring buffer size (bytes)",
398           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
399           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
400           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
401
402   /**
403    * GstQueue2:avg-in-rate
404    *
405    * The average input data rate.
406    */
407   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
408       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
409           "Average input data rate (bytes/s)",
410           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
411
412   /* set several parent class virtual functions */
413   gobject_class->finalize = gst_queue2_finalize;
414
415   gst_element_class_add_pad_template (gstelement_class,
416       gst_static_pad_template_get (&srctemplate));
417   gst_element_class_add_pad_template (gstelement_class,
418       gst_static_pad_template_get (&sinktemplate));
419
420   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
421       "Generic",
422       "Simple data queue",
423       "Erik Walthinsen <omega@cse.ogi.edu>, "
424       "Wim Taymans <wim.taymans@gmail.com>");
425
426   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
427   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
428 }
429
430 static void
431 gst_queue2_init (GstQueue2 * queue)
432 {
433   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
434
435   gst_pad_set_chain_function (queue->sinkpad,
436       GST_DEBUG_FUNCPTR (gst_queue2_chain));
437   gst_pad_set_chain_list_function (queue->sinkpad,
438       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
439   gst_pad_set_activatemode_function (queue->sinkpad,
440       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
441   gst_pad_set_event_function (queue->sinkpad,
442       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
443   gst_pad_set_query_function (queue->sinkpad,
444       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
445   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
446   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
447
448   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
449
450   gst_pad_set_activatemode_function (queue->srcpad,
451       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
452   gst_pad_set_getrange_function (queue->srcpad,
453       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
454   gst_pad_set_event_function (queue->srcpad,
455       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
456   gst_pad_set_query_function (queue->srcpad,
457       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
458   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
459   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
460
461   /* levels */
462   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
463   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
464   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
465   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
466   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
467   queue->use_buffering = DEFAULT_USE_BUFFERING;
468   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
469   queue->low_percent = DEFAULT_LOW_PERCENT;
470   queue->high_percent = DEFAULT_HIGH_PERCENT;
471
472   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
473   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
474
475   queue->sinktime = GST_CLOCK_TIME_NONE;
476   queue->srctime = GST_CLOCK_TIME_NONE;
477   queue->sink_tainted = TRUE;
478   queue->src_tainted = TRUE;
479
480   queue->srcresult = GST_FLOW_FLUSHING;
481   queue->sinkresult = GST_FLOW_FLUSHING;
482   queue->is_eos = FALSE;
483   queue->in_timer = g_timer_new ();
484   queue->out_timer = g_timer_new ();
485
486   g_mutex_init (&queue->qlock);
487   queue->waiting_add = FALSE;
488   g_cond_init (&queue->item_add);
489   queue->waiting_del = FALSE;
490   g_cond_init (&queue->item_del);
491   g_queue_init (&queue->queue);
492
493   g_cond_init (&queue->query_handled);
494   queue->last_query = FALSE;
495
496   g_mutex_init (&queue->buffering_post_lock);
497   queue->buffering_percent = 100;
498
499   /* tempfile related */
500   queue->temp_template = NULL;
501   queue->temp_location = NULL;
502   queue->temp_remove = DEFAULT_TEMP_REMOVE;
503
504   queue->ring_buffer = NULL;
505   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
506
507   GST_DEBUG_OBJECT (queue,
508       "initialized queue's not_empty & not_full conditions");
509 }
510
511 /* called only once, as opposed to dispose */
512 static void
513 gst_queue2_finalize (GObject * object)
514 {
515   GstQueue2 *queue = GST_QUEUE2 (object);
516
517   GST_DEBUG_OBJECT (queue, "finalizing queue");
518
519   while (!g_queue_is_empty (&queue->queue)) {
520     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
521
522     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
523       gst_mini_object_unref (qitem->item);
524     g_slice_free (GstQueue2Item, qitem);
525   }
526
527   queue->last_query = FALSE;
528   g_queue_clear (&queue->queue);
529   g_mutex_clear (&queue->qlock);
530   g_mutex_clear (&queue->buffering_post_lock);
531   g_cond_clear (&queue->item_add);
532   g_cond_clear (&queue->item_del);
533   g_cond_clear (&queue->query_handled);
534   g_timer_destroy (queue->in_timer);
535   g_timer_destroy (queue->out_timer);
536
537   /* temp_file path cleanup  */
538   g_free (queue->temp_template);
539   g_free (queue->temp_location);
540
541   G_OBJECT_CLASS (parent_class)->finalize (object);
542 }
543
544 static void
545 debug_ranges (GstQueue2 * queue)
546 {
547   GstQueue2Range *walk;
548
549   for (walk = queue->ranges; walk; walk = walk->next) {
550     GST_DEBUG_OBJECT (queue,
551         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
552         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
553         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
554         walk->rb_writing_pos, walk->reading_pos,
555         walk == queue->current ? "**y**" : "  n  ");
556   }
557 }
558
559 /* clear all the downloaded ranges */
560 static void
561 clean_ranges (GstQueue2 * queue)
562 {
563   GST_DEBUG_OBJECT (queue, "clean queue ranges");
564
565   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
566   queue->ranges = NULL;
567   queue->current = NULL;
568 }
569
570 /* find a range that contains @offset or NULL when nothing does */
571 static GstQueue2Range *
572 find_range (GstQueue2 * queue, guint64 offset)
573 {
574   GstQueue2Range *range = NULL;
575   GstQueue2Range *walk;
576
577   /* first do a quick check for the current range */
578   for (walk = queue->ranges; walk; walk = walk->next) {
579     if (offset >= walk->offset && offset <= walk->writing_pos) {
580       /* we can reuse an existing range */
581       range = walk;
582       break;
583     }
584   }
585   if (range) {
586     GST_DEBUG_OBJECT (queue,
587         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
588         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
589   } else {
590     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
591   }
592   return range;
593 }
594
595 static void
596 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
597 {
598   guint64 max_reading_pos, writing_pos;
599
600   writing_pos = range->writing_pos;
601   max_reading_pos = range->max_reading_pos;
602
603   if (writing_pos > max_reading_pos)
604     queue->cur_level.bytes = writing_pos - max_reading_pos;
605   else
606     queue->cur_level.bytes = 0;
607 }
608
609 /* make a new range for @offset or reuse an existing range */
610 static GstQueue2Range *
611 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
612 {
613   GstQueue2Range *range, *prev, *next;
614
615   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
616
617   if ((range = find_range (queue, offset))) {
618     GST_DEBUG_OBJECT (queue,
619         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
620         range->writing_pos);
621     if (update_existing && range->writing_pos != offset) {
622       GST_DEBUG_OBJECT (queue, "updating range writing position to "
623           "%" G_GUINT64_FORMAT, offset);
624       range->writing_pos = offset;
625     }
626   } else {
627     GST_DEBUG_OBJECT (queue,
628         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
629
630     range = g_slice_new0 (GstQueue2Range);
631     range->offset = offset;
632     /* we want to write to the next location in the ring buffer */
633     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
634     range->writing_pos = offset;
635     range->rb_writing_pos = range->rb_offset;
636     range->reading_pos = offset;
637     range->max_reading_pos = offset;
638
639     /* insert sorted */
640     prev = NULL;
641     next = queue->ranges;
642     while (next) {
643       if (next->offset > offset) {
644         /* insert before next */
645         GST_DEBUG_OBJECT (queue,
646             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
647             next->offset);
648         break;
649       }
650       /* try next */
651       prev = next;
652       next = next->next;
653     }
654     range->next = next;
655     if (prev)
656       prev->next = range;
657     else
658       queue->ranges = range;
659   }
660   debug_ranges (queue);
661
662   /* update the stats for this range */
663   update_cur_level (queue, range);
664
665   return range;
666 }
667
668
669 /* clear and init the download ranges for offset 0 */
670 static void
671 init_ranges (GstQueue2 * queue)
672 {
673   GST_DEBUG_OBJECT (queue, "init queue ranges");
674
675   /* get rid of all the current ranges */
676   clean_ranges (queue);
677   /* make a range for offset 0 */
678   queue->current = add_range (queue, 0, TRUE);
679 }
680
681 /* calculate the diff between running time on the sink and src of the queue.
682  * This is the total amount of time in the queue. */
683 static void
684 update_time_level (GstQueue2 * queue)
685 {
686   if (queue->sink_tainted) {
687     queue->sinktime =
688         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
689         queue->sink_segment.position);
690     queue->sink_tainted = FALSE;
691   }
692
693   if (queue->src_tainted) {
694     queue->srctime =
695         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
696         queue->src_segment.position);
697     queue->src_tainted = FALSE;
698   }
699
700   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
701       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
702
703   if (queue->sinktime != GST_CLOCK_TIME_NONE
704       && queue->srctime != GST_CLOCK_TIME_NONE
705       && queue->sinktime >= queue->srctime)
706     queue->cur_level.time = queue->sinktime - queue->srctime;
707   else
708     queue->cur_level.time = 0;
709 }
710
711 /* take a SEGMENT event and apply the values to segment, updating the time
712  * level of queue. */
713 static void
714 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
715     gboolean is_sink)
716 {
717   gst_event_copy_segment (event, segment);
718
719   if (segment->format == GST_FORMAT_BYTES) {
720     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
721       /* start is where we'll be getting from and as such writing next */
722       queue->current = add_range (queue, segment->start, TRUE);
723     }
724   }
725
726   /* now configure the values, we use these to track timestamps on the
727    * sinkpad. */
728   if (segment->format != GST_FORMAT_TIME) {
729     /* non-time format, pretent the current time segment is closed with a
730      * 0 start and unknown stop time. */
731     segment->format = GST_FORMAT_TIME;
732     segment->start = 0;
733     segment->stop = -1;
734     segment->time = 0;
735   }
736
737   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
738
739   if (is_sink)
740     queue->sink_tainted = TRUE;
741   else
742     queue->src_tainted = TRUE;
743
744   /* segment can update the time level of the queue */
745   update_time_level (queue);
746 }
747
748 static void
749 apply_gap (GstQueue2 * queue, GstEvent * event,
750     GstSegment * segment, gboolean is_sink)
751 {
752   GstClockTime timestamp;
753   GstClockTime duration;
754
755   gst_event_parse_gap (event, &timestamp, &duration);
756
757   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
758
759     if (GST_CLOCK_TIME_IS_VALID (duration)) {
760       timestamp += duration;
761     }
762
763     segment->position = timestamp;
764
765     if (is_sink)
766       queue->sink_tainted = TRUE;
767     else
768       queue->src_tainted = TRUE;
769
770     /* calc diff with other end */
771     update_time_level (queue);
772   }
773 }
774
775 /* take a buffer and update segment, updating the time level of the queue. */
776 static void
777 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
778     gboolean is_sink)
779 {
780   GstClockTime duration, timestamp;
781
782   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
783   duration = GST_BUFFER_DURATION (buffer);
784
785   /* if no timestamp is set, assume it's continuous with the previous
786    * time */
787   if (timestamp == GST_CLOCK_TIME_NONE)
788     timestamp = segment->position;
789
790   /* add duration */
791   if (duration != GST_CLOCK_TIME_NONE)
792     timestamp += duration;
793
794   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
795       GST_TIME_ARGS (timestamp));
796
797   segment->position = timestamp;
798
799   if (is_sink)
800     queue->sink_tainted = TRUE;
801   else
802     queue->src_tainted = TRUE;
803
804   /* calc diff with other end */
805   update_time_level (queue);
806 }
807
808 static gboolean
809 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
810 {
811   GstClockTime *timestamp = data;
812   GstClockTime btime;
813
814   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
815       " duration %" GST_TIME_FORMAT, idx,
816       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
817       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
818       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
819
820   btime = GST_BUFFER_DTS_OR_PTS (*buf);
821   if (GST_CLOCK_TIME_IS_VALID (btime))
822     *timestamp = btime;
823
824   if (GST_BUFFER_DURATION_IS_VALID (*buf))
825     *timestamp += GST_BUFFER_DURATION (*buf);
826
827   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
828   return TRUE;
829 }
830
831 /* take a buffer list and update segment, updating the time level of the queue */
832 static void
833 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
834     GstSegment * segment, gboolean is_sink)
835 {
836   GstClockTime timestamp;
837
838   /* if no timestamp is set, assume it's continuous with the previous time */
839   timestamp = segment->position;
840
841   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
842
843   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
844       GST_TIME_ARGS (timestamp));
845
846   segment->position = timestamp;
847
848   if (is_sink)
849     queue->sink_tainted = TRUE;
850   else
851     queue->src_tainted = TRUE;
852
853   /* calc diff with other end */
854   update_time_level (queue);
855 }
856
857 static gboolean
858 get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
859     gint * percent)
860 {
861   gint perc;
862
863   if (queue->high_percent <= 0) {
864     if (percent)
865       *percent = 100;
866     if (is_buffering)
867       *is_buffering = FALSE;
868     return FALSE;
869   }
870 #define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
871
872   if (queue->is_eos) {
873     /* on EOS we are always 100% full, we set the var here so that it we can
874      * reuse the logic below to stop buffering */
875     perc = 100;
876     GST_LOG_OBJECT (queue, "we are EOS");
877   } else {
878     GST_LOG_OBJECT (queue,
879         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
880         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
881         queue->cur_level.buffers);
882
883     /* figure out the percent we are filled, we take the max of all formats. */
884     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
885       perc = GET_PERCENT (bytes, 0);
886     } else {
887       guint64 rb_size = queue->ring_buffer_max_size;
888       perc = GET_PERCENT (bytes, rb_size);
889     }
890     perc = MAX (perc, GET_PERCENT (time, 0));
891     perc = MAX (perc, GET_PERCENT (buffers, 0));
892
893     /* also apply the rate estimate when we need to */
894     if (queue->use_rate_estimate)
895       perc = MAX (perc, GET_PERCENT (rate_time, 0));
896
897     /* Don't get to 0% unless we're really empty */
898     if (queue->cur_level.bytes > 0)
899       perc = MAX (1, perc);
900   }
901 #undef GET_PERCENT
902
903   if (is_buffering)
904     *is_buffering = queue->is_buffering;
905
906   /* scale to high percent so that it becomes the 100% mark */
907   perc = perc * 100 / queue->high_percent;
908   /* clip */
909   if (perc > 100)
910     perc = 100;
911
912   if (percent)
913     *percent = perc;
914
915   GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
916       perc);
917
918   return TRUE;
919 }
920
921 static void
922 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
923     gint * avg_in, gint * avg_out, gint64 * buffering_left)
924 {
925   if (mode) {
926     if (!QUEUE_IS_USING_QUEUE (queue)) {
927       if (QUEUE_IS_USING_RING_BUFFER (queue))
928         *mode = GST_BUFFERING_TIMESHIFT;
929       else
930         *mode = GST_BUFFERING_DOWNLOAD;
931     } else {
932       *mode = GST_BUFFERING_STREAM;
933     }
934   }
935
936   if (avg_in)
937     *avg_in = queue->byte_in_rate;
938   if (avg_out)
939     *avg_out = queue->byte_out_rate;
940
941   if (buffering_left) {
942     *buffering_left = (percent == 100 ? 0 : -1);
943
944     if (queue->use_rate_estimate) {
945       guint64 max, cur;
946
947       max = queue->max_level.rate_time;
948       cur = queue->cur_level.rate_time;
949
950       if (percent != 100 && max > cur)
951         *buffering_left = (max - cur) / 1000000;
952     }
953   }
954 }
955
956 static void
957 gst_queue2_post_buffering (GstQueue2 * queue)
958 {
959   GstMessage *msg = NULL;
960
961   g_mutex_lock (&queue->buffering_post_lock);
962   GST_QUEUE2_MUTEX_LOCK (queue);
963   if (queue->percent_changed) {
964     gint percent = queue->buffering_percent;
965
966     queue->percent_changed = FALSE;
967
968     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
969     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
970
971     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
972         queue->avg_out, queue->buffering_left);
973   }
974   GST_QUEUE2_MUTEX_UNLOCK (queue);
975
976   if (msg != NULL)
977     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
978
979   g_mutex_unlock (&queue->buffering_post_lock);
980 }
981
982 static void
983 update_buffering (GstQueue2 * queue)
984 {
985   gint percent;
986
987   /* Ensure the variables used to calculate buffering state are up-to-date. */
988   if (queue->current)
989     update_cur_level (queue, queue->current);
990   update_in_rates (queue);
991
992   if (!get_buffering_percent (queue, NULL, &percent))
993     return;
994
995   if (queue->is_buffering) {
996     /* if we were buffering see if we reached the high watermark */
997     if (percent >= 100)
998       queue->is_buffering = FALSE;
999
1000     SET_PERCENT (queue, percent);
1001   } else {
1002     /* we were not buffering, check if we need to start buffering if we drop
1003      * below the low threshold */
1004     if (percent < queue->low_percent) {
1005       queue->is_buffering = TRUE;
1006       SET_PERCENT (queue, percent);
1007     }
1008   }
1009 }
1010
1011 static void
1012 reset_rate_timer (GstQueue2 * queue)
1013 {
1014   queue->bytes_in = 0;
1015   queue->bytes_out = 0;
1016   queue->byte_in_rate = 0.0;
1017   queue->byte_in_period = 0;
1018   queue->byte_out_rate = 0.0;
1019   queue->last_update_in_rates_elapsed = 0.0;
1020   queue->last_in_elapsed = 0.0;
1021   queue->last_out_elapsed = 0.0;
1022   queue->in_timer_started = FALSE;
1023   queue->out_timer_started = FALSE;
1024 }
1025
1026 /* the interval in seconds to recalculate the rate */
1027 #define RATE_INTERVAL    0.2
1028 /* Tuning for rate estimation. We use a large window for the input rate because
1029  * it should be stable when connected to a network. The output rate is less
1030  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1031  * therefore adapt more quickly.
1032  * However, initial input rate may be subject to a burst, and should therefore
1033  * initially also adapt more quickly to changes, and only later on give higher
1034  * weight to previous values. */
1035 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1036 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1037
1038 static void
1039 update_in_rates (GstQueue2 * queue)
1040 {
1041   gdouble elapsed, period;
1042   gdouble byte_in_rate;
1043
1044   if (!queue->in_timer_started) {
1045     queue->in_timer_started = TRUE;
1046     g_timer_start (queue->in_timer);
1047     return;
1048   }
1049
1050   queue->last_update_in_rates_elapsed = elapsed =
1051       g_timer_elapsed (queue->in_timer, NULL);
1052
1053   /* recalc after each interval. */
1054   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1055     period = elapsed - queue->last_in_elapsed;
1056
1057     GST_DEBUG_OBJECT (queue,
1058         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1059         period, queue->bytes_in, queue->byte_in_period);
1060
1061     byte_in_rate = queue->bytes_in / period;
1062
1063     if (queue->byte_in_rate == 0.0)
1064       queue->byte_in_rate = byte_in_rate;
1065     else
1066       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1067           (double) queue->byte_in_period, period);
1068
1069     /* another data point, cap at 16 for long time running average */
1070     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1071       queue->byte_in_period += period;
1072
1073     /* reset the values to calculate rate over the next interval */
1074     queue->last_in_elapsed = elapsed;
1075     queue->bytes_in = 0;
1076   }
1077
1078   if (queue->byte_in_rate > 0.0) {
1079     queue->cur_level.rate_time =
1080         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1081   }
1082   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1083       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1084 }
1085
1086 static void
1087 update_out_rates (GstQueue2 * queue)
1088 {
1089   gdouble elapsed, period;
1090   gdouble byte_out_rate;
1091
1092   if (!queue->out_timer_started) {
1093     queue->out_timer_started = TRUE;
1094     g_timer_start (queue->out_timer);
1095     return;
1096   }
1097
1098   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1099
1100   /* recalc after each interval. */
1101   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1102     period = elapsed - queue->last_out_elapsed;
1103
1104     GST_DEBUG_OBJECT (queue,
1105         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1106
1107     byte_out_rate = queue->bytes_out / period;
1108
1109     if (queue->byte_out_rate == 0.0)
1110       queue->byte_out_rate = byte_out_rate;
1111     else
1112       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1113
1114     /* reset the values to calculate rate over the next interval */
1115     queue->last_out_elapsed = elapsed;
1116     queue->bytes_out = 0;
1117   }
1118   if (queue->byte_in_rate > 0.0) {
1119     queue->cur_level.rate_time =
1120         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1121   }
1122   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1123       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1124 }
1125
1126 static void
1127 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1128 {
1129   guint64 reading_pos, max_reading_pos;
1130
1131   reading_pos = pos;
1132   max_reading_pos = range->max_reading_pos;
1133
1134   max_reading_pos = MAX (max_reading_pos, reading_pos);
1135
1136   GST_DEBUG_OBJECT (queue,
1137       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1138       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1139   range->max_reading_pos = max_reading_pos;
1140
1141   update_cur_level (queue, range);
1142 }
1143
1144 static gboolean
1145 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1146 {
1147   GstEvent *event;
1148   gboolean res;
1149
1150   /* until we receive the FLUSH_STOP from this seek, we skip data */
1151   queue->seeking = TRUE;
1152   GST_QUEUE2_MUTEX_UNLOCK (queue);
1153
1154   debug_ranges (queue);
1155
1156   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1157
1158   event =
1159       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1160       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1161       GST_SEEK_TYPE_NONE, -1);
1162
1163   res = gst_pad_push_event (queue->sinkpad, event);
1164   GST_QUEUE2_MUTEX_LOCK (queue);
1165
1166   if (res) {
1167     /* Between us sending the seek event and re-acquiring the lock, the source
1168      * thread might already have pushed data and moved along the range's
1169      * writing_pos beyond the seek offset. In that case we don't want to set
1170      * the writing position back to the requested seek position, as it would
1171      * cause data to be written to the wrong offset in the file or ring buffer.
1172      * We still do the add_range call to switch the current range to the
1173      * requested range, or create one if one doesn't exist yet. */
1174     queue->current = add_range (queue, offset, FALSE);
1175   }
1176
1177   return res;
1178 }
1179
1180 /* get the threshold for when we decide to seek rather than wait */
1181 static guint64
1182 get_seek_threshold (GstQueue2 * queue)
1183 {
1184   guint64 threshold;
1185
1186   /* FIXME, find a good threshold based on the incoming rate. */
1187   threshold = 1024 * 512;
1188
1189   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1190     threshold = MIN (threshold,
1191         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1192   }
1193   return threshold;
1194 }
1195
1196 /* see if there is enough data in the file to read a full buffer */
1197 static gboolean
1198 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1199 {
1200   GstQueue2Range *range;
1201
1202   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1203       offset, length);
1204
1205   if ((range = find_range (queue, offset))) {
1206     if (queue->current != range) {
1207       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1208       perform_seek_to_offset (queue, range->writing_pos);
1209     }
1210
1211     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1212         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1213
1214     /* we have a range for offset */
1215     GST_DEBUG_OBJECT (queue,
1216         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1217         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1218
1219     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1220       return TRUE;
1221
1222     if (offset + length <= range->writing_pos)
1223       return TRUE;
1224     else
1225       GST_DEBUG_OBJECT (queue,
1226           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1227           (offset + length) - range->writing_pos);
1228
1229   } else {
1230     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1231         " len %u", offset, length);
1232     /* we don't have the range, see how far away we are */
1233     if (!queue->is_eos && queue->current) {
1234       guint64 threshold = get_seek_threshold (queue);
1235
1236       if (offset >= queue->current->offset && offset <=
1237           queue->current->writing_pos + threshold) {
1238         GST_INFO_OBJECT (queue,
1239             "requested data is within range, wait for data");
1240         return FALSE;
1241       }
1242     }
1243
1244     /* too far away, do a seek */
1245     perform_seek_to_offset (queue, offset);
1246   }
1247
1248   return FALSE;
1249 }
1250
1251 #ifdef HAVE_FSEEKO
1252 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1253 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1254 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1255 #else
1256 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1257 #endif
1258
1259 static GstFlowReturn
1260 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1261     guint8 * dst, gint64 * read_return)
1262 {
1263   guint8 *ring_buffer;
1264   size_t res;
1265
1266   ring_buffer = queue->ring_buffer;
1267
1268   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1269     goto seek_failed;
1270
1271   /* this should not block */
1272   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1273       length, offset);
1274   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1275     res = fread (dst, 1, length, queue->temp_file);
1276   } else {
1277     memcpy (dst, ring_buffer + offset, length);
1278     res = length;
1279   }
1280
1281   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1282
1283   if (G_UNLIKELY (res < length)) {
1284     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1285       goto could_not_read;
1286     /* check for errors or EOF */
1287     if (ferror (queue->temp_file))
1288       goto could_not_read;
1289     if (feof (queue->temp_file) && length > 0)
1290       goto eos;
1291   }
1292
1293   *read_return = res;
1294
1295   return GST_FLOW_OK;
1296
1297 seek_failed:
1298   {
1299     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1300     return GST_FLOW_ERROR;
1301   }
1302 could_not_read:
1303   {
1304     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1305     return GST_FLOW_ERROR;
1306   }
1307 eos:
1308   {
1309     GST_DEBUG ("non-regular file hits EOS");
1310     return GST_FLOW_EOS;
1311   }
1312 }
1313
1314 static GstFlowReturn
1315 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1316     GstBuffer ** buffer)
1317 {
1318   GstBuffer *buf;
1319   GstMapInfo info;
1320   guint8 *data;
1321   guint64 file_offset;
1322   guint block_length, remaining, read_length;
1323   guint64 rb_size;
1324   guint64 max_size;
1325   guint64 rpos;
1326   GstFlowReturn ret = GST_FLOW_OK;
1327
1328   /* allocate the output buffer of the requested size */
1329   if (*buffer == NULL)
1330     buf = gst_buffer_new_allocate (NULL, length, NULL);
1331   else
1332     buf = *buffer;
1333
1334   gst_buffer_map (buf, &info, GST_MAP_WRITE);
1335   data = info.data;
1336
1337   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1338       offset);
1339
1340   rpos = offset;
1341   rb_size = queue->ring_buffer_max_size;
1342   max_size = QUEUE_MAX_BYTES (queue);
1343
1344   remaining = length;
1345   while (remaining > 0) {
1346     /* configure how much/whether to read */
1347     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1348       read_length = 0;
1349
1350       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1351         guint64 level;
1352
1353         /* calculate how far away the offset is */
1354         if (queue->current->writing_pos > rpos)
1355           level = queue->current->writing_pos - rpos;
1356         else
1357           level = 0;
1358
1359         GST_DEBUG_OBJECT (queue,
1360             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1361             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1362             rpos, queue->current->writing_pos, level, max_size);
1363
1364         if (level >= max_size) {
1365           /* we don't have the data but if we have a ring buffer that is full, we
1366            * need to read */
1367           GST_DEBUG_OBJECT (queue,
1368               "ring buffer full, reading QUEUE_MAX_BYTES %"
1369               G_GUINT64_FORMAT " bytes", max_size);
1370           read_length = max_size;
1371         } else if (queue->is_eos) {
1372           /* won't get any more data so read any data we have */
1373           if (level) {
1374             GST_DEBUG_OBJECT (queue,
1375                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1376                 level);
1377             read_length = level;
1378             remaining = level;
1379             length = level;
1380           } else
1381             goto hit_eos;
1382         }
1383       }
1384
1385       if (read_length == 0) {
1386         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1387           GST_DEBUG_OBJECT (queue,
1388               "update current position [%" G_GUINT64_FORMAT "-%"
1389               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1390           update_cur_pos (queue, queue->current, rpos);
1391           GST_QUEUE2_SIGNAL_DEL (queue);
1392         }
1393
1394         if (queue->use_buffering)
1395           update_buffering (queue);
1396
1397         GST_DEBUG_OBJECT (queue, "waiting for add");
1398         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1399         continue;
1400       }
1401     } else {
1402       /* we have the requested data so read it */
1403       read_length = remaining;
1404     }
1405
1406     /* set range reading_pos to actual reading position for this read */
1407     queue->current->reading_pos = rpos;
1408
1409     /* configure how much and from where to read */
1410     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1411       file_offset =
1412           (queue->current->rb_offset + (rpos -
1413               queue->current->offset)) % rb_size;
1414       if (file_offset + read_length > rb_size) {
1415         block_length = rb_size - file_offset;
1416       } else {
1417         block_length = read_length;
1418       }
1419     } else {
1420       file_offset = rpos;
1421       block_length = read_length;
1422     }
1423
1424     /* while we still have data to read, we loop */
1425     while (read_length > 0) {
1426       gint64 read_return;
1427
1428       ret =
1429           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1430           data, &read_return);
1431       if (ret != GST_FLOW_OK)
1432         goto read_error;
1433
1434       file_offset += read_return;
1435       if (QUEUE_IS_USING_RING_BUFFER (queue))
1436         file_offset %= rb_size;
1437
1438       data += read_return;
1439       read_length -= read_return;
1440       block_length = read_length;
1441       remaining -= read_return;
1442
1443       rpos = (queue->current->reading_pos += read_return);
1444       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1445     }
1446     GST_QUEUE2_SIGNAL_DEL (queue);
1447     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1448   }
1449
1450   gst_buffer_unmap (buf, &info);
1451   gst_buffer_resize (buf, 0, length);
1452
1453   GST_BUFFER_OFFSET (buf) = offset;
1454   GST_BUFFER_OFFSET_END (buf) = offset + length;
1455
1456   *buffer = buf;
1457
1458   return ret;
1459
1460   /* ERRORS */
1461 hit_eos:
1462   {
1463     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1464     gst_buffer_unmap (buf, &info);
1465     if (*buffer == NULL)
1466       gst_buffer_unref (buf);
1467     return GST_FLOW_EOS;
1468   }
1469 out_flushing:
1470   {
1471     GST_DEBUG_OBJECT (queue, "we are flushing");
1472     gst_buffer_unmap (buf, &info);
1473     if (*buffer == NULL)
1474       gst_buffer_unref (buf);
1475     return GST_FLOW_FLUSHING;
1476   }
1477 read_error:
1478   {
1479     GST_DEBUG_OBJECT (queue, "we have a read error");
1480     gst_buffer_unmap (buf, &info);
1481     if (*buffer == NULL)
1482       gst_buffer_unref (buf);
1483     return ret;
1484   }
1485 }
1486
1487 /* should be called with QUEUE_LOCK */
1488 static GstMiniObject *
1489 gst_queue2_read_item_from_file (GstQueue2 * queue)
1490 {
1491   GstMiniObject *item;
1492
1493   if (queue->stream_start_event != NULL) {
1494     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1495     queue->stream_start_event = NULL;
1496   } else if (queue->starting_segment != NULL) {
1497     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1498     queue->starting_segment = NULL;
1499   } else {
1500     GstFlowReturn ret;
1501     GstBuffer *buffer = NULL;
1502     guint64 reading_pos;
1503
1504     reading_pos = queue->current->reading_pos;
1505
1506     ret =
1507         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1508         &buffer);
1509
1510     switch (ret) {
1511       case GST_FLOW_OK:
1512         item = GST_MINI_OBJECT_CAST (buffer);
1513         break;
1514       case GST_FLOW_EOS:
1515         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1516         break;
1517       default:
1518         item = NULL;
1519         break;
1520     }
1521   }
1522   return item;
1523 }
1524
1525 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1526  * the temp filename. */
1527 static gboolean
1528 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1529 {
1530   gint fd = -1;
1531   gchar *name = NULL;
1532
1533   if (queue->temp_file)
1534     goto already_opened;
1535
1536   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1537
1538   /* If temp_template was set, allocate a filename and open that filen */
1539
1540   /* nothing to do */
1541   if (queue->temp_template == NULL)
1542     goto no_directory;
1543
1544   /* make copy of the template, we don't want to change this */
1545   name = g_strdup (queue->temp_template);
1546   fd = g_mkstemp (name);
1547   if (fd == -1)
1548     goto mkstemp_failed;
1549
1550   /* open the file for update/writing */
1551   queue->temp_file = fdopen (fd, "wb+");
1552   /* error creating file */
1553   if (queue->temp_file == NULL)
1554     goto open_failed;
1555
1556   g_free (queue->temp_location);
1557   queue->temp_location = name;
1558
1559   GST_QUEUE2_MUTEX_UNLOCK (queue);
1560
1561   /* we can't emit the notify with the lock */
1562   g_object_notify (G_OBJECT (queue), "temp-location");
1563
1564   GST_QUEUE2_MUTEX_LOCK (queue);
1565
1566   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1567
1568   return TRUE;
1569
1570   /* ERRORS */
1571 already_opened:
1572   {
1573     GST_DEBUG_OBJECT (queue, "temp file was already open");
1574     return TRUE;
1575   }
1576 no_directory:
1577   {
1578     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1579         (_("No Temp directory specified.")), (NULL));
1580     return FALSE;
1581   }
1582 mkstemp_failed:
1583   {
1584     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1585         (_("Could not create temp file \"%s\"."), queue->temp_template),
1586         GST_ERROR_SYSTEM);
1587     g_free (name);
1588     return FALSE;
1589   }
1590 open_failed:
1591   {
1592     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1593         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1594     g_free (name);
1595     if (fd != -1)
1596       close (fd);
1597     return FALSE;
1598   }
1599 }
1600
1601 static void
1602 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1603 {
1604   /* nothing to do */
1605   if (queue->temp_file == NULL)
1606     return;
1607
1608   GST_DEBUG_OBJECT (queue, "closing temp file");
1609
1610   fflush (queue->temp_file);
1611   fclose (queue->temp_file);
1612
1613   if (queue->temp_remove) {
1614     if (remove (queue->temp_location) < 0) {
1615       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1616           queue->temp_location, g_strerror (errno));
1617     }
1618   }
1619
1620   queue->temp_file = NULL;
1621   clean_ranges (queue);
1622 }
1623
1624 static void
1625 gst_queue2_flush_temp_file (GstQueue2 * queue)
1626 {
1627   if (queue->temp_file == NULL)
1628     return;
1629
1630   GST_DEBUG_OBJECT (queue, "flushing temp file");
1631
1632   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1633 }
1634
1635 static void
1636 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1637 {
1638   if (!QUEUE_IS_USING_QUEUE (queue)) {
1639     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1640       gst_queue2_flush_temp_file (queue);
1641     init_ranges (queue);
1642   } else {
1643     while (!g_queue_is_empty (&queue->queue)) {
1644       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1645
1646       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1647           && GST_EVENT_IS_STICKY (qitem->item)
1648           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1649           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1650         gst_pad_store_sticky_event (queue->srcpad,
1651             GST_EVENT_CAST (qitem->item));
1652       }
1653
1654       /* Then lose another reference because we are supposed to destroy that
1655          data when flushing */
1656       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1657         gst_mini_object_unref (qitem->item);
1658       g_slice_free (GstQueue2Item, qitem);
1659     }
1660   }
1661   queue->last_query = FALSE;
1662   g_cond_signal (&queue->query_handled);
1663   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1664   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1665   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1666   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1667   queue->sink_tainted = queue->src_tainted = TRUE;
1668   if (queue->starting_segment != NULL)
1669     gst_event_unref (queue->starting_segment);
1670   queue->starting_segment = NULL;
1671   queue->segment_event_received = FALSE;
1672   gst_event_replace (&queue->stream_start_event, NULL);
1673
1674   /* we deleted a lot of something */
1675   GST_QUEUE2_SIGNAL_DEL (queue);
1676 }
1677
1678 static gboolean
1679 gst_queue2_wait_free_space (GstQueue2 * queue)
1680 {
1681   /* We make space available if we're "full" according to whatever
1682    * the user defined as "full". */
1683   if (gst_queue2_is_filled (queue)) {
1684     gboolean started;
1685
1686     /* pause the timer while we wait. The fact that we are waiting does not mean
1687      * the byterate on the input pad is lower */
1688     if ((started = queue->in_timer_started))
1689       g_timer_stop (queue->in_timer);
1690
1691     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1692         "queue is full, waiting for free space");
1693     do {
1694       GST_QUEUE2_MUTEX_UNLOCK (queue);
1695       g_signal_emit (queue, gst_queue2_signals[SIGNAL_OVERRUN], 0);
1696       GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
1697       /* we recheck, the signal could have changed the thresholds */
1698       if (!gst_queue2_is_filled (queue))
1699         break;
1700
1701       /* Wait for space to be available, we could be unlocked because of a flush. */
1702       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1703     }
1704     while (gst_queue2_is_filled (queue));
1705
1706     /* and continue if we were running before */
1707     if (started)
1708       g_timer_continue (queue->in_timer);
1709   }
1710   return TRUE;
1711
1712   /* ERRORS */
1713 out_flushing:
1714   {
1715     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1716     return FALSE;
1717   }
1718 }
1719
1720 static gboolean
1721 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1722 {
1723   GstMapInfo info;
1724   guint8 *data, *ring_buffer;
1725   guint size, rb_size;
1726   guint64 writing_pos, new_writing_pos;
1727   GstQueue2Range *range, *prev, *next;
1728   gboolean do_seek = FALSE;
1729
1730   if (QUEUE_IS_USING_RING_BUFFER (queue))
1731     writing_pos = queue->current->rb_writing_pos;
1732   else
1733     writing_pos = queue->current->writing_pos;
1734   ring_buffer = queue->ring_buffer;
1735   rb_size = queue->ring_buffer_max_size;
1736
1737   gst_buffer_map (buffer, &info, GST_MAP_READ);
1738
1739   size = info.size;
1740   data = info.data;
1741
1742   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1743       writing_pos);
1744
1745   /* sanity check */
1746   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1747       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1748     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1749         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1750         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1751   }
1752
1753   while (size > 0) {
1754     guint to_write;
1755
1756     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1757       gint64 space;
1758
1759       /* calculate the space in the ring buffer not used by data from
1760        * the current range */
1761       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1762         /* wait until there is some free space */
1763         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1764       }
1765       /* get the amount of space we have */
1766       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1767
1768       /* calculate if we need to split or if we can write the entire
1769        * buffer now */
1770       to_write = MIN (size, space);
1771
1772       /* the writing position in the ring buffer after writing (part
1773        * or all of) the buffer */
1774       new_writing_pos = (writing_pos + to_write) % rb_size;
1775
1776       prev = NULL;
1777       range = queue->ranges;
1778
1779       /* if we need to overwrite data in the ring buffer, we need to
1780        * update the ranges
1781        *
1782        * warning: this code is complicated and includes some
1783        * simplifications - pen, paper and diagrams for the cases
1784        * recommended! */
1785       while (range) {
1786         guint64 range_data_start, range_data_end;
1787         GstQueue2Range *range_to_destroy = NULL;
1788
1789         range_data_start = range->rb_offset;
1790         range_data_end = range->rb_writing_pos;
1791
1792         /* handle the special case where the range has no data in it */
1793         if (range->writing_pos == range->offset) {
1794           if (range != queue->current) {
1795             GST_DEBUG_OBJECT (queue,
1796                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1797                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1798             /* remove range */
1799             range_to_destroy = range;
1800             if (prev)
1801               prev->next = range->next;
1802           }
1803           goto next_range;
1804         }
1805
1806         if (range_data_end > range_data_start) {
1807           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1808             goto next_range;
1809
1810           if (new_writing_pos > range_data_start) {
1811             if (new_writing_pos >= range_data_end) {
1812               GST_DEBUG_OBJECT (queue,
1813                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1814                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1815               /* remove range */
1816               range_to_destroy = range;
1817               if (prev)
1818                 prev->next = range->next;
1819             } else {
1820               GST_DEBUG_OBJECT (queue,
1821                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1822                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1823                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1824                   range->offset + new_writing_pos - range_data_start,
1825                   new_writing_pos);
1826               range->offset += (new_writing_pos - range_data_start);
1827               range->rb_offset = new_writing_pos;
1828             }
1829           }
1830         } else {
1831           guint64 new_wpos_virt = writing_pos + to_write;
1832
1833           if (new_wpos_virt <= range_data_start)
1834             goto next_range;
1835
1836           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1837             GST_DEBUG_OBJECT (queue,
1838                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1839                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1840             /* remove range */
1841             range_to_destroy = range;
1842             if (prev)
1843               prev->next = range->next;
1844           } else {
1845             GST_DEBUG_OBJECT (queue,
1846                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1847                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1848                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1849                 range->offset + new_writing_pos - range_data_start,
1850                 new_writing_pos);
1851             range->offset += (new_wpos_virt - range_data_start);
1852             range->rb_offset = new_writing_pos;
1853           }
1854         }
1855
1856       next_range:
1857         if (!range_to_destroy)
1858           prev = range;
1859
1860         range = range->next;
1861         if (range_to_destroy) {
1862           if (range_to_destroy == queue->ranges)
1863             queue->ranges = range;
1864           g_slice_free (GstQueue2Range, range_to_destroy);
1865           range_to_destroy = NULL;
1866         }
1867       }
1868     } else {
1869       to_write = size;
1870       new_writing_pos = writing_pos + to_write;
1871     }
1872
1873     if (QUEUE_IS_USING_TEMP_FILE (queue)
1874         && FSEEK_FILE (queue->temp_file, writing_pos))
1875       goto seek_failed;
1876
1877     if (new_writing_pos > writing_pos) {
1878       GST_INFO_OBJECT (queue,
1879           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1880           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1881           queue->current->writing_pos, queue->current->rb_writing_pos);
1882       /* either not using ring buffer or no wrapping, just write */
1883       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1884         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1885           goto handle_error;
1886       } else {
1887         memcpy (ring_buffer + writing_pos, data, to_write);
1888       }
1889
1890       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1891         /* try to merge with next range */
1892         while ((next = queue->current->next)) {
1893           GST_INFO_OBJECT (queue,
1894               "checking merge with next range %" G_GUINT64_FORMAT " < %"
1895               G_GUINT64_FORMAT, new_writing_pos, next->offset);
1896           if (new_writing_pos < next->offset)
1897             break;
1898
1899           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1900               next->writing_pos);
1901
1902           /* remove the group */
1903           queue->current->next = next->next;
1904
1905           /* We use the threshold to decide if we want to do a seek or simply
1906            * read the data again. If there is not so much data in the range we
1907            * prefer to avoid to seek and read it again. */
1908           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
1909             /* the new range had more data than the threshold, it's worth keeping
1910              * it and doing a seek. */
1911             new_writing_pos = next->writing_pos;
1912             do_seek = TRUE;
1913           }
1914           g_slice_free (GstQueue2Range, next);
1915         }
1916         goto update_and_signal;
1917       }
1918     } else {
1919       /* wrapping */
1920       guint block_one, block_two;
1921
1922       block_one = rb_size - writing_pos;
1923       block_two = to_write - block_one;
1924
1925       if (block_one > 0) {
1926         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
1927         /* write data to end of ring buffer */
1928         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1929           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1930             goto handle_error;
1931         } else {
1932           memcpy (ring_buffer + writing_pos, data, block_one);
1933         }
1934       }
1935
1936       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
1937         goto seek_failed;
1938
1939       if (block_two > 0) {
1940         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
1941         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1942           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
1943             goto handle_error;
1944         } else {
1945           memcpy (ring_buffer, data + block_one, block_two);
1946         }
1947       }
1948     }
1949
1950   update_and_signal:
1951     /* update the writing positions */
1952     size -= to_write;
1953     GST_INFO_OBJECT (queue,
1954         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
1955         to_write, writing_pos, size);
1956
1957     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1958       data += to_write;
1959       queue->current->writing_pos += to_write;
1960       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1961     } else {
1962       queue->current->writing_pos = writing_pos = new_writing_pos;
1963     }
1964     if (do_seek)
1965       perform_seek_to_offset (queue, new_writing_pos);
1966
1967     update_cur_level (queue, queue->current);
1968
1969     /* update the buffering status */
1970     if (queue->use_buffering)
1971       update_buffering (queue);
1972
1973     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1974         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1975
1976     GST_QUEUE2_SIGNAL_ADD (queue);
1977   }
1978
1979   gst_buffer_unmap (buffer, &info);
1980
1981   return TRUE;
1982
1983   /* ERRORS */
1984 out_flushing:
1985   {
1986     GST_DEBUG_OBJECT (queue, "we are flushing");
1987     gst_buffer_unmap (buffer, &info);
1988     /* FIXME - GST_FLOW_EOS ? */
1989     return FALSE;
1990   }
1991 seek_failed:
1992   {
1993     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1994     gst_buffer_unmap (buffer, &info);
1995     return FALSE;
1996   }
1997 handle_error:
1998   {
1999     switch (errno) {
2000       case ENOSPC:{
2001         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2002         break;
2003       }
2004       default:{
2005         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2006             (_("Error while writing to download file.")),
2007             ("%s", g_strerror (errno)));
2008       }
2009     }
2010     gst_buffer_unmap (buffer, &info);
2011     return FALSE;
2012   }
2013 }
2014
2015 static gboolean
2016 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2017 {
2018   GstQueue2 *queue = q;
2019
2020   GST_TRACE_OBJECT (queue,
2021       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2022       gst_buffer_get_size (*buf));
2023
2024   if (!gst_queue2_create_write (queue, *buf)) {
2025     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2026     return FALSE;
2027   }
2028   return TRUE;
2029 }
2030
2031 static gboolean
2032 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2033 {
2034   guint *p_size = data;
2035   gsize buf_size;
2036
2037   buf_size = gst_buffer_get_size (*buf);
2038   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2039   *p_size += buf_size;
2040   return TRUE;
2041 }
2042
2043 /* enqueue an item an update the level stats */
2044 static void
2045 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2046     GstQueue2ItemType item_type)
2047 {
2048   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2049     GstBuffer *buffer;
2050     guint size;
2051
2052     buffer = GST_BUFFER_CAST (item);
2053     size = gst_buffer_get_size (buffer);
2054
2055     /* add buffer to the statistics */
2056     if (QUEUE_IS_USING_QUEUE (queue)) {
2057       queue->cur_level.buffers++;
2058       queue->cur_level.bytes += size;
2059     }
2060     queue->bytes_in += size;
2061
2062     /* apply new buffer to segment stats */
2063     apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
2064     /* update the byterate stats */
2065     update_in_rates (queue);
2066
2067     if (!QUEUE_IS_USING_QUEUE (queue)) {
2068       /* FIXME - check return value? */
2069       gst_queue2_create_write (queue, buffer);
2070     }
2071   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2072     GstBufferList *buffer_list;
2073     guint size = 0;
2074
2075     buffer_list = GST_BUFFER_LIST_CAST (item);
2076
2077     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2078     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2079
2080     /* add buffer to the statistics */
2081     if (QUEUE_IS_USING_QUEUE (queue)) {
2082       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2083       queue->cur_level.bytes += size;
2084     }
2085     queue->bytes_in += size;
2086
2087     /* apply new buffer to segment stats */
2088     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2089
2090     /* update the byterate stats */
2091     update_in_rates (queue);
2092
2093     if (!QUEUE_IS_USING_QUEUE (queue)) {
2094       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2095     }
2096   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2097     GstEvent *event;
2098
2099     event = GST_EVENT_CAST (item);
2100
2101     switch (GST_EVENT_TYPE (event)) {
2102       case GST_EVENT_EOS:
2103         /* Zero the thresholds, this makes sure the queue is completely
2104          * filled and we can read all data from the queue. */
2105         GST_DEBUG_OBJECT (queue, "we have EOS");
2106         queue->is_eos = TRUE;
2107         break;
2108       case GST_EVENT_SEGMENT:
2109         apply_segment (queue, event, &queue->sink_segment, TRUE);
2110         /* This is our first new segment, we hold it
2111          * as we can't save it on the temp file */
2112         if (!QUEUE_IS_USING_QUEUE (queue)) {
2113           if (queue->segment_event_received)
2114             goto unexpected_event;
2115
2116           queue->segment_event_received = TRUE;
2117           if (queue->starting_segment != NULL)
2118             gst_event_unref (queue->starting_segment);
2119           queue->starting_segment = event;
2120           item = NULL;
2121         }
2122         /* a new segment allows us to accept more buffers if we got EOS
2123          * from downstream */
2124         queue->unexpected = FALSE;
2125         break;
2126       case GST_EVENT_GAP:
2127         apply_gap (queue, event, &queue->sink_segment, TRUE);
2128         break;
2129       case GST_EVENT_STREAM_START:
2130         if (!QUEUE_IS_USING_QUEUE (queue)) {
2131           gst_event_replace (&queue->stream_start_event, event);
2132           gst_event_unref (event);
2133           item = NULL;
2134         }
2135         break;
2136       case GST_EVENT_CAPS:{
2137         GstCaps *caps;
2138
2139         gst_event_parse_caps (event, &caps);
2140         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2141
2142         if (!QUEUE_IS_USING_QUEUE (queue)) {
2143           GST_LOG ("Dropping caps event, not using queue");
2144           gst_event_unref (event);
2145           item = NULL;
2146         }
2147         break;
2148       }
2149       default:
2150         if (!QUEUE_IS_USING_QUEUE (queue))
2151           goto unexpected_event;
2152         break;
2153     }
2154   } else if (GST_IS_QUERY (item)) {
2155     /* Can't happen as we check that in the caller */
2156     if (!QUEUE_IS_USING_QUEUE (queue))
2157       g_assert_not_reached ();
2158   } else {
2159     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2160         item, GST_OBJECT_NAME (queue));
2161     /* we can't really unref since we don't know what it is */
2162     item = NULL;
2163   }
2164
2165   if (item) {
2166     /* update the buffering status */
2167     if (queue->use_buffering)
2168       update_buffering (queue);
2169
2170     if (QUEUE_IS_USING_QUEUE (queue)) {
2171       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2172       qitem->type = item_type;
2173       qitem->item = item;
2174       g_queue_push_tail (&queue->queue, qitem);
2175     } else {
2176       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2177     }
2178
2179     GST_QUEUE2_SIGNAL_ADD (queue);
2180   }
2181
2182   return;
2183
2184   /* ERRORS */
2185 unexpected_event:
2186   {
2187     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2188
2189     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2190         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2191         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2192     gst_event_unref (GST_EVENT_CAST (item));
2193     return;
2194   }
2195 }
2196
2197 /* dequeue an item from the queue and update level stats */
2198 static GstMiniObject *
2199 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2200 {
2201   GstMiniObject *item;
2202
2203   if (!QUEUE_IS_USING_QUEUE (queue)) {
2204     item = gst_queue2_read_item_from_file (queue);
2205   } else {
2206     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2207
2208     if (qitem == NULL)
2209       goto no_item;
2210
2211     item = qitem->item;
2212     g_slice_free (GstQueue2Item, qitem);
2213   }
2214
2215   if (item == NULL)
2216     goto no_item;
2217
2218   if (GST_IS_BUFFER (item)) {
2219     GstBuffer *buffer;
2220     guint size;
2221
2222     buffer = GST_BUFFER_CAST (item);
2223     size = gst_buffer_get_size (buffer);
2224     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2225
2226     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2227         "retrieved buffer %p from queue", buffer);
2228
2229     if (QUEUE_IS_USING_QUEUE (queue)) {
2230       queue->cur_level.buffers--;
2231       queue->cur_level.bytes -= size;
2232     }
2233     queue->bytes_out += size;
2234
2235     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
2236     /* update the byterate stats */
2237     update_out_rates (queue);
2238     /* update the buffering */
2239     if (queue->use_buffering)
2240       update_buffering (queue);
2241
2242   } else if (GST_IS_EVENT (item)) {
2243     GstEvent *event = GST_EVENT_CAST (item);
2244
2245     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2246
2247     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2248         "retrieved event %p from queue", event);
2249
2250     switch (GST_EVENT_TYPE (event)) {
2251       case GST_EVENT_EOS:
2252         /* queue is empty now that we dequeued the EOS */
2253         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2254         break;
2255       case GST_EVENT_SEGMENT:
2256         apply_segment (queue, event, &queue->src_segment, FALSE);
2257         break;
2258       case GST_EVENT_GAP:
2259         apply_gap (queue, event, &queue->src_segment, FALSE);
2260         break;
2261       default:
2262         break;
2263     }
2264   } else if (GST_IS_BUFFER_LIST (item)) {
2265     GstBufferList *buffer_list;
2266     guint size = 0;
2267
2268     buffer_list = GST_BUFFER_LIST_CAST (item);
2269     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2270     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2271
2272     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2273         "retrieved buffer list %p from queue", buffer_list);
2274
2275     if (QUEUE_IS_USING_QUEUE (queue)) {
2276       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2277       queue->cur_level.bytes -= size;
2278     }
2279     queue->bytes_out += size;
2280
2281     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2282     /* update the byterate stats */
2283     update_out_rates (queue);
2284     /* update the buffering */
2285     if (queue->use_buffering)
2286       update_buffering (queue);
2287   } else if (GST_IS_QUERY (item)) {
2288     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2289         "retrieved query %p from queue", item);
2290     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2291   } else {
2292     g_warning
2293         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2294         item, GST_OBJECT_NAME (queue));
2295     item = NULL;
2296     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2297   }
2298   GST_QUEUE2_SIGNAL_DEL (queue);
2299
2300   return item;
2301
2302   /* ERRORS */
2303 no_item:
2304   {
2305     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2306     return NULL;
2307   }
2308 }
2309
2310 static gboolean
2311 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2312     GstEvent * event)
2313 {
2314   gboolean ret = TRUE;
2315   GstQueue2 *queue;
2316
2317   queue = GST_QUEUE2 (parent);
2318
2319   switch (GST_EVENT_TYPE (event)) {
2320     case GST_EVENT_FLUSH_START:
2321     {
2322       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2323       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2324         /* forward event */
2325         ret = gst_pad_push_event (queue->srcpad, event);
2326
2327         /* now unblock the chain function */
2328         GST_QUEUE2_MUTEX_LOCK (queue);
2329         queue->srcresult = GST_FLOW_FLUSHING;
2330         queue->sinkresult = GST_FLOW_FLUSHING;
2331         /* unblock the loop and chain functions */
2332         GST_QUEUE2_SIGNAL_ADD (queue);
2333         GST_QUEUE2_SIGNAL_DEL (queue);
2334         queue->last_query = FALSE;
2335         g_cond_signal (&queue->query_handled);
2336         GST_QUEUE2_MUTEX_UNLOCK (queue);
2337
2338         /* make sure it pauses, this should happen since we sent
2339          * flush_start downstream. */
2340         gst_pad_pause_task (queue->srcpad);
2341         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2342       } else {
2343         GST_QUEUE2_MUTEX_LOCK (queue);
2344         /* flush the sink pad */
2345         queue->sinkresult = GST_FLOW_FLUSHING;
2346         GST_QUEUE2_SIGNAL_DEL (queue);
2347         queue->last_query = FALSE;
2348         g_cond_signal (&queue->query_handled);
2349         GST_QUEUE2_MUTEX_UNLOCK (queue);
2350
2351         gst_event_unref (event);
2352       }
2353       break;
2354     }
2355     case GST_EVENT_FLUSH_STOP:
2356     {
2357       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2358
2359       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2360         /* forward event */
2361         ret = gst_pad_push_event (queue->srcpad, event);
2362
2363         GST_QUEUE2_MUTEX_LOCK (queue);
2364         gst_queue2_locked_flush (queue, FALSE, TRUE);
2365         queue->srcresult = GST_FLOW_OK;
2366         queue->sinkresult = GST_FLOW_OK;
2367         queue->is_eos = FALSE;
2368         queue->unexpected = FALSE;
2369         queue->seeking = FALSE;
2370         /* reset rate counters */
2371         reset_rate_timer (queue);
2372         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2373             queue->srcpad, NULL);
2374         GST_QUEUE2_MUTEX_UNLOCK (queue);
2375       } else {
2376         GST_QUEUE2_MUTEX_LOCK (queue);
2377         queue->segment_event_received = FALSE;
2378         queue->is_eos = FALSE;
2379         queue->unexpected = FALSE;
2380         queue->sinkresult = GST_FLOW_OK;
2381         queue->seeking = FALSE;
2382         GST_QUEUE2_MUTEX_UNLOCK (queue);
2383
2384         gst_event_unref (event);
2385       }
2386       break;
2387     }
2388     default:
2389       if (GST_EVENT_IS_SERIALIZED (event)) {
2390         /* serialized events go in the queue */
2391         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2392         if (queue->srcresult != GST_FLOW_OK) {
2393           /* Errors in sticky event pushing are no problem and ignored here
2394            * as they will cause more meaningful errors during data flow.
2395            * For EOS events, that are not followed by data flow, we still
2396            * return FALSE here though and report an error.
2397            */
2398           if (!GST_EVENT_IS_STICKY (event)) {
2399             goto out_flow_error;
2400           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2401             if (queue->srcresult == GST_FLOW_NOT_LINKED
2402                 || queue->srcresult < GST_FLOW_EOS) {
2403               GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2404                   (_("Internal data flow error.")),
2405                   ("streaming task paused, reason %s (%d)",
2406                       gst_flow_get_name (queue->srcresult), queue->srcresult));
2407             }
2408             goto out_flow_error;
2409           }
2410         }
2411         /* refuse more events on EOS */
2412         if (queue->is_eos)
2413           goto out_eos;
2414         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2415         GST_QUEUE2_MUTEX_UNLOCK (queue);
2416         gst_queue2_post_buffering (queue);
2417       } else {
2418         /* non-serialized events are passed upstream. */
2419         ret = gst_pad_push_event (queue->srcpad, event);
2420       }
2421       break;
2422   }
2423   return ret;
2424
2425   /* ERRORS */
2426 out_flushing:
2427   {
2428     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2429     GST_QUEUE2_MUTEX_UNLOCK (queue);
2430     gst_event_unref (event);
2431     return FALSE;
2432   }
2433 out_eos:
2434   {
2435     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2436     GST_QUEUE2_MUTEX_UNLOCK (queue);
2437     gst_event_unref (event);
2438     return FALSE;
2439   }
2440 out_flow_error:
2441   {
2442     GST_LOG_OBJECT (queue,
2443         "refusing event, we have a downstream flow error: %s",
2444         gst_flow_get_name (queue->srcresult));
2445     GST_QUEUE2_MUTEX_UNLOCK (queue);
2446     gst_event_unref (event);
2447     return FALSE;
2448   }
2449 }
2450
2451 static gboolean
2452 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2453     GstQuery * query)
2454 {
2455   GstQueue2 *queue;
2456   gboolean res;
2457
2458   queue = GST_QUEUE2 (parent);
2459
2460   switch (GST_QUERY_TYPE (query)) {
2461     default:
2462       if (GST_QUERY_IS_SERIALIZED (query)) {
2463         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2464         /* serialized events go in the queue. We need to be certain that we
2465          * don't cause deadlocks waiting for the query return value. We check if
2466          * the queue is empty (nothing is blocking downstream and the query can
2467          * be pushed for sure) or we are not buffering. If we are buffering,
2468          * the pipeline waits to unblock downstream until our queue fills up
2469          * completely, which can not happen if we block on the query..
2470          * Therefore we only potentially block when we are not buffering. */
2471         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2472         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2473                 || !queue->use_buffering)) {
2474           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2475             gst_queue2_locked_enqueue (queue, query,
2476                 GST_QUEUE2_ITEM_TYPE_QUERY);
2477
2478             STATUS (queue, queue->sinkpad, "wait for QUERY");
2479             g_cond_wait (&queue->query_handled, &queue->qlock);
2480             if (queue->sinkresult != GST_FLOW_OK)
2481               goto out_flushing;
2482             res = queue->last_query;
2483           } else {
2484             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2485             res = FALSE;
2486           }
2487         } else {
2488           GST_DEBUG_OBJECT (queue,
2489               "refusing query, we are not using the queue");
2490           res = FALSE;
2491         }
2492         GST_QUEUE2_MUTEX_UNLOCK (queue);
2493         gst_queue2_post_buffering (queue);
2494       } else {
2495         res = gst_pad_query_default (pad, parent, query);
2496       }
2497       break;
2498   }
2499   return res;
2500
2501   /* ERRORS */
2502 out_flushing:
2503   {
2504     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2505     GST_QUEUE2_MUTEX_UNLOCK (queue);
2506     return FALSE;
2507   }
2508 }
2509
2510 static gboolean
2511 gst_queue2_is_empty (GstQueue2 * queue)
2512 {
2513   /* never empty on EOS */
2514   if (queue->is_eos)
2515     return FALSE;
2516
2517   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2518     return queue->current->writing_pos <= queue->current->max_reading_pos;
2519   } else {
2520     if (queue->queue.length == 0)
2521       return TRUE;
2522   }
2523
2524   return FALSE;
2525 }
2526
2527 static gboolean
2528 gst_queue2_is_filled (GstQueue2 * queue)
2529 {
2530   gboolean res;
2531
2532   /* always filled on EOS */
2533   if (queue->is_eos)
2534     return TRUE;
2535
2536 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2537     (queue->cur_level.format) >= ((alt_max) ? \
2538       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2539
2540   /* if using a ring buffer we're filled if all ring buffer space is used
2541    * _by the current range_ */
2542   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2543     guint64 rb_size = queue->ring_buffer_max_size;
2544     GST_DEBUG_OBJECT (queue,
2545         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2546         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2547     return CHECK_FILLED (bytes, rb_size);
2548   }
2549
2550   /* if using file, we're never filled if we don't have EOS */
2551   if (QUEUE_IS_USING_TEMP_FILE (queue))
2552     return FALSE;
2553
2554   /* we are never filled when we have no buffers at all */
2555   if (queue->cur_level.buffers == 0)
2556     return FALSE;
2557
2558   /* we are filled if one of the current levels exceeds the max */
2559   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2560       || CHECK_FILLED (time, 0);
2561
2562   /* if we need to, use the rate estimate to check against the max time we are
2563    * allowed to queue */
2564   if (queue->use_rate_estimate)
2565     res |= CHECK_FILLED (rate_time, 0);
2566
2567 #undef CHECK_FILLED
2568   return res;
2569 }
2570
2571 static GstFlowReturn
2572 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2573     GstMiniObject * item, GstQueue2ItemType item_type)
2574 {
2575   /* we have to lock the queue since we span threads */
2576   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2577   /* when we received EOS, we refuse more data */
2578   if (queue->is_eos)
2579     goto out_eos;
2580   /* when we received unexpected from downstream, refuse more buffers */
2581   if (queue->unexpected)
2582     goto out_unexpected;
2583
2584   /* while we didn't receive the newsegment, we're seeking and we skip data */
2585   if (queue->seeking)
2586     goto out_seeking;
2587
2588   if (!gst_queue2_wait_free_space (queue))
2589     goto out_flushing;
2590
2591   /* put buffer in queue now */
2592   gst_queue2_locked_enqueue (queue, item, item_type);
2593   GST_QUEUE2_MUTEX_UNLOCK (queue);
2594   gst_queue2_post_buffering (queue);
2595
2596   return GST_FLOW_OK;
2597
2598   /* special conditions */
2599 out_flushing:
2600   {
2601     GstFlowReturn ret = queue->sinkresult;
2602
2603     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2604         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2605     GST_QUEUE2_MUTEX_UNLOCK (queue);
2606     gst_mini_object_unref (item);
2607
2608     return ret;
2609   }
2610 out_eos:
2611   {
2612     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2613     GST_QUEUE2_MUTEX_UNLOCK (queue);
2614     gst_mini_object_unref (item);
2615
2616     return GST_FLOW_EOS;
2617   }
2618 out_seeking:
2619   {
2620     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2621     GST_QUEUE2_MUTEX_UNLOCK (queue);
2622     gst_mini_object_unref (item);
2623
2624     return GST_FLOW_OK;
2625   }
2626 out_unexpected:
2627   {
2628     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2629     GST_QUEUE2_MUTEX_UNLOCK (queue);
2630     gst_mini_object_unref (item);
2631
2632     return GST_FLOW_EOS;
2633   }
2634 }
2635
2636 static GstFlowReturn
2637 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2638 {
2639   GstQueue2 *queue;
2640
2641   queue = GST_QUEUE2 (parent);
2642
2643   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2644       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2645       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2646       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2647       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2648
2649   return gst_queue2_chain_buffer_or_buffer_list (queue,
2650       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2651 }
2652
2653 static GstFlowReturn
2654 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2655     GstBufferList * buffer_list)
2656 {
2657   GstQueue2 *queue;
2658
2659   queue = GST_QUEUE2 (parent);
2660
2661   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2662       "received buffer list %p", buffer_list);
2663
2664   return gst_queue2_chain_buffer_or_buffer_list (queue,
2665       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2666 }
2667
2668 static GstMiniObject *
2669 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2670 {
2671   GstMiniObject *data;
2672
2673   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2674
2675   /* stop pushing buffers, we dequeue all items until we see an item that we
2676    * can push again, which is EOS or SEGMENT. If there is nothing in the
2677    * queue we can push, we set a flag to make the sinkpad refuse more
2678    * buffers with an EOS return value until we receive something
2679    * pushable again or we get flushed. */
2680   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2681     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2682       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2683           "dropping EOS buffer %p", data);
2684       gst_buffer_unref (GST_BUFFER_CAST (data));
2685     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2686       GstEvent *event = GST_EVENT_CAST (data);
2687       GstEventType type = GST_EVENT_TYPE (event);
2688
2689       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2690         /* we found a pushable item in the queue, push it out */
2691         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2692             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2693         return data;
2694       }
2695       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2696           "dropping EOS event %p", event);
2697       gst_event_unref (event);
2698     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2699       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2700           "dropping EOS buffer list %p", data);
2701       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2702     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2703       queue->last_query = FALSE;
2704       g_cond_signal (&queue->query_handled);
2705       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2706     }
2707   }
2708   /* no more items in the queue. Set the unexpected flag so that upstream
2709    * make us refuse any more buffers on the sinkpad. Since we will still
2710    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2711    * task function does not shut down. */
2712   queue->unexpected = TRUE;
2713   return NULL;
2714 }
2715
2716 /* dequeue an item from the queue an push it downstream. This functions returns
2717  * the result of the push. */
2718 static GstFlowReturn
2719 gst_queue2_push_one (GstQueue2 * queue)
2720 {
2721   GstFlowReturn result = queue->srcresult;
2722   GstMiniObject *data;
2723   GstQueue2ItemType item_type;
2724
2725   data = gst_queue2_locked_dequeue (queue, &item_type);
2726   if (data == NULL)
2727     goto no_item;
2728
2729 next:
2730   STATUS (queue, queue->srcpad, "We have something dequeud");
2731   g_atomic_int_set (&queue->downstream_may_block,
2732       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2733       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2734   GST_QUEUE2_MUTEX_UNLOCK (queue);
2735   gst_queue2_post_buffering (queue);
2736
2737   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2738     GstBuffer *buffer;
2739
2740     buffer = GST_BUFFER_CAST (data);
2741
2742     result = gst_pad_push (queue->srcpad, buffer);
2743     g_atomic_int_set (&queue->downstream_may_block, 0);
2744
2745     /* need to check for srcresult here as well */
2746     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2747     if (result == GST_FLOW_EOS) {
2748       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2749       if (data != NULL)
2750         goto next;
2751       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2752        * to the caller so that the task function does not shut down */
2753       result = GST_FLOW_OK;
2754     }
2755   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2756     GstEvent *event = GST_EVENT_CAST (data);
2757     GstEventType type = GST_EVENT_TYPE (event);
2758
2759     gst_pad_push_event (queue->srcpad, event);
2760
2761     /* if we're EOS, return EOS so that the task pauses. */
2762     if (type == GST_EVENT_EOS) {
2763       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2764           "pushed EOS event %p, return EOS", event);
2765       result = GST_FLOW_EOS;
2766     }
2767
2768     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2769   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2770     GstBufferList *buffer_list;
2771
2772     buffer_list = GST_BUFFER_LIST_CAST (data);
2773
2774     result = gst_pad_push_list (queue->srcpad, buffer_list);
2775     g_atomic_int_set (&queue->downstream_may_block, 0);
2776
2777     /* need to check for srcresult here as well */
2778     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2779     if (result == GST_FLOW_EOS) {
2780       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2781       if (data != NULL)
2782         goto next;
2783       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2784        * to the caller so that the task function does not shut down */
2785       result = GST_FLOW_OK;
2786     }
2787   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2788     GstQuery *query = GST_QUERY_CAST (data);
2789
2790     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2791     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2792     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2793     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2794         "did query %p, return %d", query, queue->last_query);
2795     g_cond_signal (&queue->query_handled);
2796     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2797     result = GST_FLOW_OK;
2798   }
2799   return result;
2800
2801   /* ERRORS */
2802 no_item:
2803   {
2804     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2805         "exit because we have no item in the queue");
2806     return GST_FLOW_ERROR;
2807   }
2808 out_flushing:
2809   {
2810     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2811     return GST_FLOW_FLUSHING;
2812   }
2813 }
2814
2815 /* called repeatedly with @pad as the source pad. This function should push out
2816  * data to the peer element. */
2817 static void
2818 gst_queue2_loop (GstPad * pad)
2819 {
2820   GstQueue2 *queue;
2821   GstFlowReturn ret;
2822
2823   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2824
2825   /* have to lock for thread-safety */
2826   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2827
2828   if (gst_queue2_is_empty (queue)) {
2829     gboolean started;
2830
2831     /* pause the timer while we wait. The fact that we are waiting does not mean
2832      * the byterate on the output pad is lower */
2833     if ((started = queue->out_timer_started))
2834       g_timer_stop (queue->out_timer);
2835
2836     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2837         "queue is empty, waiting for new data");
2838     do {
2839       /* Wait for data to be available, we could be unlocked because of a flush. */
2840       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2841     }
2842     while (gst_queue2_is_empty (queue));
2843
2844     /* and continue if we were running before */
2845     if (started)
2846       g_timer_continue (queue->out_timer);
2847   }
2848   ret = gst_queue2_push_one (queue);
2849   queue->srcresult = ret;
2850   queue->sinkresult = ret;
2851   if (ret != GST_FLOW_OK)
2852     goto out_flushing;
2853
2854   GST_QUEUE2_MUTEX_UNLOCK (queue);
2855   gst_queue2_post_buffering (queue);
2856
2857   return;
2858
2859   /* ERRORS */
2860 out_flushing:
2861   {
2862     gboolean eos = queue->is_eos;
2863     GstFlowReturn ret = queue->srcresult;
2864
2865     gst_pad_pause_task (queue->srcpad);
2866     if (ret == GST_FLOW_FLUSHING) {
2867       gst_queue2_locked_flush (queue, FALSE, FALSE);
2868     } else {
2869       GST_QUEUE2_SIGNAL_DEL (queue);
2870       queue->last_query = FALSE;
2871       g_cond_signal (&queue->query_handled);
2872     }
2873     GST_QUEUE2_MUTEX_UNLOCK (queue);
2874     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2875         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2876     /* let app know about us giving up if upstream is not expected to do so */
2877     /* EOS is already taken care of elsewhere */
2878     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
2879       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2880           (_("Internal data flow error.")),
2881           ("streaming task paused, reason %s (%d)",
2882               gst_flow_get_name (ret), ret));
2883       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2884     }
2885     return;
2886   }
2887 }
2888
2889 static gboolean
2890 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2891 {
2892   gboolean res = TRUE;
2893   GstQueue2 *queue = GST_QUEUE2 (parent);
2894
2895 #ifndef GST_DISABLE_GST_DEBUG
2896   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2897       event, GST_EVENT_TYPE_NAME (event));
2898 #endif
2899
2900   switch (GST_EVENT_TYPE (event)) {
2901     case GST_EVENT_FLUSH_START:
2902       if (QUEUE_IS_USING_QUEUE (queue)) {
2903         /* just forward upstream */
2904         res = gst_pad_push_event (queue->sinkpad, event);
2905       } else {
2906         /* now unblock the getrange function */
2907         GST_QUEUE2_MUTEX_LOCK (queue);
2908         GST_DEBUG_OBJECT (queue, "flushing");
2909         queue->srcresult = GST_FLOW_FLUSHING;
2910         GST_QUEUE2_SIGNAL_ADD (queue);
2911         GST_QUEUE2_MUTEX_UNLOCK (queue);
2912
2913         /* when using a temp file, we eat the event */
2914         res = TRUE;
2915         gst_event_unref (event);
2916       }
2917       break;
2918     case GST_EVENT_FLUSH_STOP:
2919       if (QUEUE_IS_USING_QUEUE (queue)) {
2920         /* just forward upstream */
2921         res = gst_pad_push_event (queue->sinkpad, event);
2922       } else {
2923         /* now unblock the getrange function */
2924         GST_QUEUE2_MUTEX_LOCK (queue);
2925         queue->srcresult = GST_FLOW_OK;
2926         GST_QUEUE2_MUTEX_UNLOCK (queue);
2927
2928         /* when using a temp file, we eat the event */
2929         res = TRUE;
2930         gst_event_unref (event);
2931       }
2932       break;
2933     case GST_EVENT_RECONFIGURE:
2934       GST_QUEUE2_MUTEX_LOCK (queue);
2935       /* assume downstream is linked now and try to push again */
2936       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
2937         queue->srcresult = GST_FLOW_OK;
2938         queue->sinkresult = GST_FLOW_OK;
2939         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
2940           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
2941               NULL);
2942         }
2943       }
2944       GST_QUEUE2_MUTEX_UNLOCK (queue);
2945
2946       res = gst_pad_push_event (queue->sinkpad, event);
2947       break;
2948     default:
2949       res = gst_pad_push_event (queue->sinkpad, event);
2950       break;
2951   }
2952
2953   return res;
2954 }
2955
2956 static gboolean
2957 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2958 {
2959   GstQueue2 *queue;
2960
2961   queue = GST_QUEUE2 (parent);
2962
2963   switch (GST_QUERY_TYPE (query)) {
2964     case GST_QUERY_POSITION:
2965     {
2966       gint64 peer_pos;
2967       GstFormat format;
2968
2969       if (!gst_pad_peer_query (queue->sinkpad, query))
2970         goto peer_failed;
2971
2972       /* get peer position */
2973       gst_query_parse_position (query, &format, &peer_pos);
2974
2975       /* FIXME: this code assumes that there's no discont in the queue */
2976       switch (format) {
2977         case GST_FORMAT_BYTES:
2978           peer_pos -= queue->cur_level.bytes;
2979           break;
2980         case GST_FORMAT_TIME:
2981           peer_pos -= queue->cur_level.time;
2982           break;
2983         default:
2984           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
2985               "know how to adjust value", gst_format_get_name (format));
2986           return FALSE;
2987       }
2988       /* set updated position */
2989       gst_query_set_position (query, format, peer_pos);
2990       break;
2991     }
2992     case GST_QUERY_DURATION:
2993     {
2994       GST_DEBUG_OBJECT (queue, "doing peer query");
2995
2996       if (!gst_pad_peer_query (queue->sinkpad, query))
2997         goto peer_failed;
2998
2999       GST_DEBUG_OBJECT (queue, "peer query success");
3000       break;
3001     }
3002     case GST_QUERY_BUFFERING:
3003     {
3004       gint percent;
3005       gboolean is_buffering;
3006       GstBufferingMode mode;
3007       gint avg_in, avg_out;
3008       gint64 buffering_left;
3009
3010       GST_DEBUG_OBJECT (queue, "query buffering");
3011
3012       get_buffering_percent (queue, &is_buffering, &percent);
3013       gst_query_set_buffering_percent (query, is_buffering, percent);
3014
3015       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3016           &buffering_left);
3017       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3018           buffering_left);
3019
3020       if (!QUEUE_IS_USING_QUEUE (queue)) {
3021         /* add ranges for download and ringbuffer buffering */
3022         GstFormat format;
3023         gint64 start, stop, range_start, range_stop;
3024         guint64 writing_pos;
3025         gint64 estimated_total;
3026         gint64 duration;
3027         gboolean peer_res, is_eos;
3028         GstQueue2Range *queued_ranges;
3029
3030         /* we need a current download region */
3031         if (queue->current == NULL)
3032           return FALSE;
3033
3034         writing_pos = queue->current->writing_pos;
3035         is_eos = queue->is_eos;
3036
3037         if (is_eos) {
3038           /* we're EOS, we know the duration in bytes now */
3039           peer_res = TRUE;
3040           duration = writing_pos;
3041         } else {
3042           /* get duration of upstream in bytes */
3043           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3044               GST_FORMAT_BYTES, &duration);
3045         }
3046
3047         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3048             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3049
3050         /* calculate remaining and total download time */
3051         if (peer_res && avg_in > 0.0)
3052           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3053         else
3054           estimated_total = -1;
3055
3056         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3057             estimated_total);
3058
3059         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3060
3061         switch (format) {
3062           case GST_FORMAT_PERCENT:
3063             /* we need duration */
3064             if (!peer_res)
3065               goto peer_failed;
3066
3067             start = 0;
3068             /* get our available data relative to the duration */
3069             if (duration != -1)
3070               stop =
3071                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3072                   duration);
3073             else
3074               stop = -1;
3075             break;
3076           case GST_FORMAT_BYTES:
3077             start = 0;
3078             stop = writing_pos;
3079             break;
3080           default:
3081             start = -1;
3082             stop = -1;
3083             break;
3084         }
3085
3086         /* fill out the buffered ranges */
3087         for (queued_ranges = queue->ranges; queued_ranges;
3088             queued_ranges = queued_ranges->next) {
3089           switch (format) {
3090             case GST_FORMAT_PERCENT:
3091               if (duration == -1) {
3092                 range_start = 0;
3093                 range_stop = 0;
3094                 break;
3095               }
3096               range_start =
3097                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3098                   queued_ranges->offset, duration);
3099               range_stop =
3100                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3101                   queued_ranges->writing_pos, duration);
3102               break;
3103             case GST_FORMAT_BYTES:
3104               range_start = queued_ranges->offset;
3105               range_stop = queued_ranges->writing_pos;
3106               break;
3107             default:
3108               range_start = -1;
3109               range_stop = -1;
3110               break;
3111           }
3112           if (range_start == range_stop)
3113             continue;
3114           GST_DEBUG_OBJECT (queue,
3115               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3116               G_GINT64_FORMAT, range_start, range_stop);
3117           gst_query_add_buffering_range (query, range_start, range_stop);
3118         }
3119
3120         gst_query_set_buffering_range (query, format, start, stop,
3121             estimated_total);
3122       }
3123       break;
3124     }
3125     case GST_QUERY_SCHEDULING:
3126     {
3127       gboolean pull_mode;
3128       GstSchedulingFlags flags = 0;
3129
3130       if (!gst_pad_peer_query (queue->sinkpad, query))
3131         goto peer_failed;
3132
3133       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3134
3135       /* we can operate in pull mode when we are using a tempfile */
3136       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3137
3138       if (pull_mode)
3139         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3140       gst_query_set_scheduling (query, flags, 0, -1, 0);
3141       if (pull_mode)
3142         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3143       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3144       break;
3145     }
3146     default:
3147       /* peer handled other queries */
3148       if (!gst_pad_query_default (pad, parent, query))
3149         goto peer_failed;
3150       break;
3151   }
3152
3153   return TRUE;
3154
3155   /* ERRORS */
3156 peer_failed:
3157   {
3158     GST_DEBUG_OBJECT (queue, "failed peer query");
3159     return FALSE;
3160   }
3161 }
3162
3163 static gboolean
3164 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3165 {
3166   GstQueue2 *queue = GST_QUEUE2 (element);
3167
3168   /* simply forward to the srcpad query function */
3169   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3170       query);
3171 }
3172
3173 static void
3174 gst_queue2_update_upstream_size (GstQueue2 * queue)
3175 {
3176   gint64 upstream_size = -1;
3177
3178   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3179           &upstream_size)) {
3180     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3181
3182     /* upstream_size can be negative but queue->upstream_size is unsigned.
3183      * Prevent setting negative values to it (the query can return -1) */
3184     if (upstream_size >= 0)
3185       queue->upstream_size = upstream_size;
3186     else
3187       queue->upstream_size = 0;
3188   }
3189 }
3190
3191 static GstFlowReturn
3192 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3193     guint length, GstBuffer ** buffer)
3194 {
3195   GstQueue2 *queue;
3196   GstFlowReturn ret;
3197
3198   queue = GST_QUEUE2_CAST (parent);
3199
3200   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3201   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3202   offset = (offset == -1) ? queue->current->reading_pos : offset;
3203
3204   GST_DEBUG_OBJECT (queue,
3205       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3206
3207   /* catch any reads beyond the size of the file here to make sure queue2
3208    * doesn't send seek events beyond the size of the file upstream, since
3209    * that would confuse elements such as souphttpsrc and/or http servers.
3210    * Demuxers often just loop until EOS at the end of the file to figure out
3211    * when they've read all the end-headers or index chunks. */
3212   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3213     gst_queue2_update_upstream_size (queue);
3214     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3215       goto out_unexpected;
3216   }
3217
3218   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3219     gst_queue2_update_upstream_size (queue);
3220     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3221       length = queue->upstream_size - offset;
3222       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3223     }
3224   }
3225
3226   /* FIXME - function will block when the range is not yet available */
3227   ret = gst_queue2_create_read (queue, offset, length, buffer);
3228   GST_QUEUE2_MUTEX_UNLOCK (queue);
3229   gst_queue2_post_buffering (queue);
3230
3231   return ret;
3232
3233   /* ERRORS */
3234 out_flushing:
3235   {
3236     ret = queue->srcresult;
3237
3238     GST_DEBUG_OBJECT (queue, "we are flushing");
3239     GST_QUEUE2_MUTEX_UNLOCK (queue);
3240     return ret;
3241   }
3242 out_unexpected:
3243   {
3244     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3245     GST_QUEUE2_MUTEX_UNLOCK (queue);
3246     return GST_FLOW_EOS;
3247   }
3248 }
3249
3250 /* sink currently only operates in push mode */
3251 static gboolean
3252 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3253     GstPadMode mode, gboolean active)
3254 {
3255   gboolean result;
3256   GstQueue2 *queue;
3257
3258   queue = GST_QUEUE2 (parent);
3259
3260   switch (mode) {
3261     case GST_PAD_MODE_PUSH:
3262       if (active) {
3263         GST_QUEUE2_MUTEX_LOCK (queue);
3264         GST_DEBUG_OBJECT (queue, "activating push mode");
3265         queue->srcresult = GST_FLOW_OK;
3266         queue->sinkresult = GST_FLOW_OK;
3267         queue->is_eos = FALSE;
3268         queue->unexpected = FALSE;
3269         reset_rate_timer (queue);
3270         GST_QUEUE2_MUTEX_UNLOCK (queue);
3271       } else {
3272         /* unblock chain function */
3273         GST_QUEUE2_MUTEX_LOCK (queue);
3274         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3275         queue->srcresult = GST_FLOW_FLUSHING;
3276         queue->sinkresult = GST_FLOW_FLUSHING;
3277         GST_QUEUE2_SIGNAL_DEL (queue);
3278         /* Unblock query handler */
3279         queue->last_query = FALSE;
3280         g_cond_signal (&queue->query_handled);
3281         GST_QUEUE2_MUTEX_UNLOCK (queue);
3282
3283         /* wait until it is unblocked and clean up */
3284         GST_PAD_STREAM_LOCK (pad);
3285         GST_QUEUE2_MUTEX_LOCK (queue);
3286         gst_queue2_locked_flush (queue, TRUE, FALSE);
3287         GST_QUEUE2_MUTEX_UNLOCK (queue);
3288         GST_PAD_STREAM_UNLOCK (pad);
3289       }
3290       result = TRUE;
3291       break;
3292     default:
3293       result = FALSE;
3294       break;
3295   }
3296   return result;
3297 }
3298
3299 /* src operating in push mode, we start a task on the source pad that pushes out
3300  * buffers from the queue */
3301 static gboolean
3302 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3303 {
3304   gboolean result = FALSE;
3305   GstQueue2 *queue;
3306
3307   queue = GST_QUEUE2 (parent);
3308
3309   if (active) {
3310     GST_QUEUE2_MUTEX_LOCK (queue);
3311     GST_DEBUG_OBJECT (queue, "activating push mode");
3312     queue->srcresult = GST_FLOW_OK;
3313     queue->sinkresult = GST_FLOW_OK;
3314     queue->is_eos = FALSE;
3315     queue->unexpected = FALSE;
3316     result =
3317         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3318     GST_QUEUE2_MUTEX_UNLOCK (queue);
3319   } else {
3320     /* unblock loop function */
3321     GST_QUEUE2_MUTEX_LOCK (queue);
3322     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3323     queue->srcresult = GST_FLOW_FLUSHING;
3324     queue->sinkresult = GST_FLOW_FLUSHING;
3325     /* the item add signal will unblock */
3326     GST_QUEUE2_SIGNAL_ADD (queue);
3327     GST_QUEUE2_MUTEX_UNLOCK (queue);
3328
3329     /* step 2, make sure streaming finishes */
3330     result = gst_pad_stop_task (pad);
3331   }
3332
3333   return result;
3334 }
3335
3336 /* pull mode, downstream will call our getrange function */
3337 static gboolean
3338 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3339 {
3340   gboolean result;
3341   GstQueue2 *queue;
3342
3343   queue = GST_QUEUE2 (parent);
3344
3345   if (active) {
3346     GST_QUEUE2_MUTEX_LOCK (queue);
3347     if (!QUEUE_IS_USING_QUEUE (queue)) {
3348       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3349         /* open the temp file now */
3350         result = gst_queue2_open_temp_location_file (queue);
3351       } else if (!queue->ring_buffer) {
3352         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3353         result = ! !queue->ring_buffer;
3354       } else {
3355         result = TRUE;
3356       }
3357
3358       GST_DEBUG_OBJECT (queue, "activating pull mode");
3359       init_ranges (queue);
3360       queue->srcresult = GST_FLOW_OK;
3361       queue->sinkresult = GST_FLOW_OK;
3362       queue->is_eos = FALSE;
3363       queue->unexpected = FALSE;
3364       queue->upstream_size = 0;
3365     } else {
3366       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3367       /* this is not allowed, we cannot operate in pull mode without a temp
3368        * file. */
3369       queue->srcresult = GST_FLOW_FLUSHING;
3370       queue->sinkresult = GST_FLOW_FLUSHING;
3371       result = FALSE;
3372     }
3373     GST_QUEUE2_MUTEX_UNLOCK (queue);
3374   } else {
3375     GST_QUEUE2_MUTEX_LOCK (queue);
3376     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3377     queue->srcresult = GST_FLOW_FLUSHING;
3378     queue->sinkresult = GST_FLOW_FLUSHING;
3379     /* this will unlock getrange */
3380     GST_QUEUE2_SIGNAL_ADD (queue);
3381     result = TRUE;
3382     GST_QUEUE2_MUTEX_UNLOCK (queue);
3383   }
3384
3385   return result;
3386 }
3387
3388 static gboolean
3389 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3390     gboolean active)
3391 {
3392   gboolean res;
3393
3394   switch (mode) {
3395     case GST_PAD_MODE_PULL:
3396       res = gst_queue2_src_activate_pull (pad, parent, active);
3397       break;
3398     case GST_PAD_MODE_PUSH:
3399       res = gst_queue2_src_activate_push (pad, parent, active);
3400       break;
3401     default:
3402       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3403       res = FALSE;
3404       break;
3405   }
3406   return res;
3407 }
3408
3409 static GstStateChangeReturn
3410 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3411 {
3412   GstQueue2 *queue;
3413   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3414
3415   queue = GST_QUEUE2 (element);
3416
3417   switch (transition) {
3418     case GST_STATE_CHANGE_NULL_TO_READY:
3419       break;
3420     case GST_STATE_CHANGE_READY_TO_PAUSED:
3421       GST_QUEUE2_MUTEX_LOCK (queue);
3422       if (!QUEUE_IS_USING_QUEUE (queue)) {
3423         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3424           if (!gst_queue2_open_temp_location_file (queue))
3425             ret = GST_STATE_CHANGE_FAILURE;
3426         } else {
3427           if (queue->ring_buffer) {
3428             g_free (queue->ring_buffer);
3429             queue->ring_buffer = NULL;
3430           }
3431           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3432             ret = GST_STATE_CHANGE_FAILURE;
3433         }
3434         init_ranges (queue);
3435       }
3436       queue->segment_event_received = FALSE;
3437       queue->starting_segment = NULL;
3438       gst_event_replace (&queue->stream_start_event, NULL);
3439       GST_QUEUE2_MUTEX_UNLOCK (queue);
3440       break;
3441     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3442       break;
3443     default:
3444       break;
3445   }
3446
3447   if (ret == GST_STATE_CHANGE_FAILURE)
3448     return ret;
3449
3450   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3451
3452   if (ret == GST_STATE_CHANGE_FAILURE)
3453     return ret;
3454
3455   switch (transition) {
3456     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3457       break;
3458     case GST_STATE_CHANGE_PAUSED_TO_READY:
3459       GST_QUEUE2_MUTEX_LOCK (queue);
3460       if (!QUEUE_IS_USING_QUEUE (queue)) {
3461         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3462           gst_queue2_close_temp_location_file (queue);
3463         } else if (queue->ring_buffer) {
3464           g_free (queue->ring_buffer);
3465           queue->ring_buffer = NULL;
3466         }
3467         clean_ranges (queue);
3468       }
3469       if (queue->starting_segment != NULL) {
3470         gst_event_unref (queue->starting_segment);
3471         queue->starting_segment = NULL;
3472       }
3473       gst_event_replace (&queue->stream_start_event, NULL);
3474       GST_QUEUE2_MUTEX_UNLOCK (queue);
3475       break;
3476     case GST_STATE_CHANGE_READY_TO_NULL:
3477       break;
3478     default:
3479       break;
3480   }
3481
3482   return ret;
3483 }
3484
3485 /* changing the capacity of the queue must wake up
3486  * the _chain function, it might have more room now
3487  * to store the buffer/event in the queue */
3488 #define QUEUE_CAPACITY_CHANGE(q) \
3489   GST_QUEUE2_SIGNAL_DEL (queue); \
3490   if (queue->use_buffering)      \
3491     update_buffering (queue);
3492
3493 /* Changing the minimum required fill level must
3494  * wake up the _loop function as it might now
3495  * be able to preceed.
3496  */
3497 #define QUEUE_THRESHOLD_CHANGE(q)\
3498   GST_QUEUE2_SIGNAL_ADD (queue);
3499
3500 static void
3501 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3502 {
3503   GstState state;
3504
3505   /* the element must be stopped in order to do this */
3506   GST_OBJECT_LOCK (queue);
3507   state = GST_STATE (queue);
3508   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3509     goto wrong_state;
3510   GST_OBJECT_UNLOCK (queue);
3511
3512   /* set new location */
3513   g_free (queue->temp_template);
3514   queue->temp_template = g_strdup (template);
3515
3516   return;
3517
3518 /* ERROR */
3519 wrong_state:
3520   {
3521     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3522     GST_OBJECT_UNLOCK (queue);
3523   }
3524 }
3525
3526 static void
3527 gst_queue2_set_property (GObject * object,
3528     guint prop_id, const GValue * value, GParamSpec * pspec)
3529 {
3530   GstQueue2 *queue = GST_QUEUE2 (object);
3531
3532   /* someone could change levels here, and since this
3533    * affects the get/put funcs, we need to lock for safety. */
3534   GST_QUEUE2_MUTEX_LOCK (queue);
3535
3536   switch (prop_id) {
3537     case PROP_MAX_SIZE_BYTES:
3538       queue->max_level.bytes = g_value_get_uint (value);
3539       QUEUE_CAPACITY_CHANGE (queue);
3540       break;
3541     case PROP_MAX_SIZE_BUFFERS:
3542       queue->max_level.buffers = g_value_get_uint (value);
3543       QUEUE_CAPACITY_CHANGE (queue);
3544       break;
3545     case PROP_MAX_SIZE_TIME:
3546       queue->max_level.time = g_value_get_uint64 (value);
3547       /* set rate_time to the same value. We use an extra field in the level
3548        * structure so that we can easily access and compare it */
3549       queue->max_level.rate_time = queue->max_level.time;
3550       QUEUE_CAPACITY_CHANGE (queue);
3551       break;
3552     case PROP_USE_BUFFERING:
3553       queue->use_buffering = g_value_get_boolean (value);
3554       if (!queue->use_buffering && queue->is_buffering) {
3555         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3556             "posting 100%% message");
3557         SET_PERCENT (queue, 100);
3558         queue->is_buffering = FALSE;
3559       }
3560
3561       if (queue->use_buffering) {
3562         queue->is_buffering = TRUE;
3563         update_buffering (queue);
3564       }
3565       break;
3566     case PROP_USE_RATE_ESTIMATE:
3567       queue->use_rate_estimate = g_value_get_boolean (value);
3568       break;
3569     case PROP_LOW_PERCENT:
3570       queue->low_percent = g_value_get_int (value);
3571       break;
3572     case PROP_HIGH_PERCENT:
3573       queue->high_percent = g_value_get_int (value);
3574       break;
3575     case PROP_TEMP_TEMPLATE:
3576       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3577       break;
3578     case PROP_TEMP_REMOVE:
3579       queue->temp_remove = g_value_get_boolean (value);
3580       break;
3581     case PROP_RING_BUFFER_MAX_SIZE:
3582       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3583       break;
3584     default:
3585       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3586       break;
3587   }
3588
3589   GST_QUEUE2_MUTEX_UNLOCK (queue);
3590   gst_queue2_post_buffering (queue);
3591 }
3592
3593 static void
3594 gst_queue2_get_property (GObject * object,
3595     guint prop_id, GValue * value, GParamSpec * pspec)
3596 {
3597   GstQueue2 *queue = GST_QUEUE2 (object);
3598
3599   GST_QUEUE2_MUTEX_LOCK (queue);
3600
3601   switch (prop_id) {
3602     case PROP_CUR_LEVEL_BYTES:
3603       g_value_set_uint (value, queue->cur_level.bytes);
3604       break;
3605     case PROP_CUR_LEVEL_BUFFERS:
3606       g_value_set_uint (value, queue->cur_level.buffers);
3607       break;
3608     case PROP_CUR_LEVEL_TIME:
3609       g_value_set_uint64 (value, queue->cur_level.time);
3610       break;
3611     case PROP_MAX_SIZE_BYTES:
3612       g_value_set_uint (value, queue->max_level.bytes);
3613       break;
3614     case PROP_MAX_SIZE_BUFFERS:
3615       g_value_set_uint (value, queue->max_level.buffers);
3616       break;
3617     case PROP_MAX_SIZE_TIME:
3618       g_value_set_uint64 (value, queue->max_level.time);
3619       break;
3620     case PROP_USE_BUFFERING:
3621       g_value_set_boolean (value, queue->use_buffering);
3622       break;
3623     case PROP_USE_RATE_ESTIMATE:
3624       g_value_set_boolean (value, queue->use_rate_estimate);
3625       break;
3626     case PROP_LOW_PERCENT:
3627       g_value_set_int (value, queue->low_percent);
3628       break;
3629     case PROP_HIGH_PERCENT:
3630       g_value_set_int (value, queue->high_percent);
3631       break;
3632     case PROP_TEMP_TEMPLATE:
3633       g_value_set_string (value, queue->temp_template);
3634       break;
3635     case PROP_TEMP_LOCATION:
3636       g_value_set_string (value, queue->temp_location);
3637       break;
3638     case PROP_TEMP_REMOVE:
3639       g_value_set_boolean (value, queue->temp_remove);
3640       break;
3641     case PROP_RING_BUFFER_MAX_SIZE:
3642       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3643       break;
3644     case PROP_AVG_IN_RATE:
3645     {
3646       gdouble in_rate = queue->byte_in_rate;
3647
3648       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3649        * calculated, so calculate it here. */
3650       if (in_rate == 0.0 && queue->bytes_in
3651           && queue->last_update_in_rates_elapsed > 0.0)
3652         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3653
3654       g_value_set_int64 (value, (gint64) in_rate);
3655       break;
3656     }
3657     default:
3658       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3659       break;
3660   }
3661
3662   GST_QUEUE2_MUTEX_UNLOCK (queue);
3663 }