queue2: fix fill level arithmetic overflow with large values
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 #ifdef __BIONIC__               /* Android */
77 #undef lseek
78 #define lseek lseek64
79 #undef off_t
80 #define off_t guint64
81 #include <fcntl.h>
82 #endif
83
84 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
85     GST_PAD_SINK,
86     GST_PAD_ALWAYS,
87     GST_STATIC_CAPS_ANY);
88
89 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
90     GST_PAD_SRC,
91     GST_PAD_ALWAYS,
92     GST_STATIC_CAPS_ANY);
93
94 GST_DEBUG_CATEGORY_STATIC (queue_debug);
95 #define GST_CAT_DEFAULT (queue_debug)
96 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
97
98 enum
99 {
100   SIGNAL_OVERRUN,
101   LAST_SIGNAL
102 };
103
104 /* other defines */
105 #define DEFAULT_BUFFER_SIZE 4096
106 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
107 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
108 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
109
110 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
111
112 /* default property values */
113 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
114 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
115 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
116 #define DEFAULT_USE_BUFFERING      FALSE
117 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
118 #define DEFAULT_LOW_PERCENT        10
119 #define DEFAULT_HIGH_PERCENT       99
120 #define DEFAULT_TEMP_REMOVE        TRUE
121 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
122
123 enum
124 {
125   PROP_0,
126   PROP_CUR_LEVEL_BUFFERS,
127   PROP_CUR_LEVEL_BYTES,
128   PROP_CUR_LEVEL_TIME,
129   PROP_MAX_SIZE_BUFFERS,
130   PROP_MAX_SIZE_BYTES,
131   PROP_MAX_SIZE_TIME,
132   PROP_USE_BUFFERING,
133   PROP_USE_RATE_ESTIMATE,
134   PROP_LOW_PERCENT,
135   PROP_HIGH_PERCENT,
136   PROP_TEMP_TEMPLATE,
137   PROP_TEMP_LOCATION,
138   PROP_TEMP_REMOVE,
139   PROP_RING_BUFFER_MAX_SIZE,
140   PROP_AVG_IN_RATE,
141   PROP_LAST
142 };
143
144 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
145   l.buffers = 0;                                        \
146   l.bytes = 0;                                          \
147   l.time = 0;                                           \
148   l.rate_time = 0;                                      \
149 } G_STMT_END
150
151 #define STATUS(queue, pad, msg) \
152   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
153                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
154                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
155                       " ns, %"G_GUINT64_FORMAT" items", \
156                       GST_DEBUG_PAD_NAME (pad), \
157                       queue->cur_level.buffers, \
158                       queue->max_level.buffers, \
159                       queue->cur_level.bytes, \
160                       queue->max_level.bytes, \
161                       queue->cur_level.time, \
162                       queue->max_level.time, \
163                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
164                         queue->current->writing_pos - queue->current->max_reading_pos : \
165                         queue->queue.length))
166
167 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
168   g_mutex_lock (&q->qlock);                                              \
169 } G_STMT_END
170
171 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
172   GST_QUEUE2_MUTEX_LOCK (q);                                            \
173   if (res != GST_FLOW_OK)                                               \
174     goto label;                                                         \
175 } G_STMT_END
176
177 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
178   g_mutex_unlock (&q->qlock);                                            \
179 } G_STMT_END
180
181 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
182   STATUS (queue, q->sinkpad, "wait for DEL");                           \
183   q->waiting_del = TRUE;                                                \
184   g_cond_wait (&q->item_del, &queue->qlock);                              \
185   q->waiting_del = FALSE;                                               \
186   if (res != GST_FLOW_OK) {                                             \
187     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
188     goto label;                                                         \
189   }                                                                     \
190   STATUS (queue, q->sinkpad, "received DEL");                           \
191 } G_STMT_END
192
193 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
194   STATUS (queue, q->srcpad, "wait for ADD");                            \
195   q->waiting_add = TRUE;                                                \
196   g_cond_wait (&q->item_add, &q->qlock);                                  \
197   q->waiting_add = FALSE;                                               \
198   if (res != GST_FLOW_OK) {                                             \
199     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
200     goto label;                                                         \
201   }                                                                     \
202   STATUS (queue, q->srcpad, "received ADD");                            \
203 } G_STMT_END
204
205 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
206   if (q->waiting_del) {                                                 \
207     STATUS (q, q->srcpad, "signal DEL");                                \
208     g_cond_signal (&q->item_del);                                        \
209   }                                                                     \
210 } G_STMT_END
211
212 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
213   if (q->waiting_add) {                                                 \
214     STATUS (q, q->sinkpad, "signal ADD");                               \
215     g_cond_signal (&q->item_add);                                        \
216   }                                                                     \
217 } G_STMT_END
218
219 #define SET_PERCENT(q, perc) G_STMT_START {                              \
220   if (perc != q->buffering_percent) {                                    \
221     q->buffering_percent = perc;                                         \
222     q->percent_changed = TRUE;                                           \
223     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
224     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
225         &q->buffering_left);                                             \
226   }                                                                      \
227 } G_STMT_END
228
229 #define _do_init \
230     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
231     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
232         "dataflow inside the queue element");
233 #define gst_queue2_parent_class parent_class
234 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
235
236 static void gst_queue2_finalize (GObject * object);
237
238 static void gst_queue2_set_property (GObject * object,
239     guint prop_id, const GValue * value, GParamSpec * pspec);
240 static void gst_queue2_get_property (GObject * object,
241     guint prop_id, GValue * value, GParamSpec * pspec);
242
243 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
244     GstBuffer * buffer);
245 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
246     GstBufferList * buffer_list);
247 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
248 static void gst_queue2_loop (GstPad * pad);
249
250 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
251     GstEvent * event);
252 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
253     GstQuery * query);
254
255 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
256     GstEvent * event);
257 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
258     GstQuery * query);
259 static gboolean gst_queue2_handle_query (GstElement * element,
260     GstQuery * query);
261
262 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
263     guint64 offset, guint length, GstBuffer ** buffer);
264
265 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
266     GstPadMode mode, gboolean active);
267 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
268     GstPadMode mode, gboolean active);
269 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
270     GstStateChange transition);
271
272 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
273 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
274
275 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
276 static void update_in_rates (GstQueue2 * queue);
277 static void gst_queue2_post_buffering (GstQueue2 * queue);
278
279 typedef enum
280 {
281   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
282   GST_QUEUE2_ITEM_TYPE_BUFFER,
283   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
284   GST_QUEUE2_ITEM_TYPE_EVENT,
285   GST_QUEUE2_ITEM_TYPE_QUERY
286 } GstQueue2ItemType;
287
288 typedef struct
289 {
290   GstQueue2ItemType type;
291   GstMiniObject *item;
292 } GstQueue2Item;
293
294 static guint gst_queue2_signals[LAST_SIGNAL] = { 0 };
295
296 static void
297 gst_queue2_class_init (GstQueue2Class * klass)
298 {
299   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
300   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
301
302   gobject_class->set_property = gst_queue2_set_property;
303   gobject_class->get_property = gst_queue2_get_property;
304
305   /* signals */
306   /**
307    * GstQueue2::overrun:
308    * @queue: the queue2 instance
309    *
310    * Reports that the buffer became full (overrun).
311    * A buffer is full if the total amount of data inside it (num-buffers, time,
312    * size) is higher than the boundary values which can be set through the
313    * GObject properties.
314    *
315    * Since: 1.8
316    */
317   gst_queue2_signals[SIGNAL_OVERRUN] =
318       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
319       G_STRUCT_OFFSET (GstQueue2Class, overrun), NULL, NULL,
320       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
321
322   /* properties */
323   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
324       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
325           "Current amount of data in the queue (bytes)",
326           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
327   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
328       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
329           "Current number of buffers in the queue",
330           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
331   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
332       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
333           "Current amount of data in the queue (in ns)",
334           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
335
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
338           "Max. amount of data in the queue (bytes, 0=disable)",
339           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
343       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
344           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
345           DEFAULT_MAX_SIZE_BUFFERS,
346           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
347           G_PARAM_STATIC_STRINGS));
348   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
349       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
350           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
351           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
352           G_PARAM_STATIC_STRINGS));
353
354   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
355       g_param_spec_boolean ("use-buffering", "Use buffering",
356           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
357           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
358           G_PARAM_STATIC_STRINGS));
359   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
360       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
361           "Estimate the bitrate of the stream to calculate time level",
362           DEFAULT_USE_RATE_ESTIMATE,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
365       g_param_spec_int ("low-percent", "Low percent",
366           "Low threshold for buffering to start. Only used if use-buffering is True",
367           0, 100, DEFAULT_LOW_PERCENT,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
370       g_param_spec_int ("high-percent", "High percent",
371           "High threshold for buffering to finish. Only used if use-buffering is True",
372           0, 100, DEFAULT_HIGH_PERCENT,
373           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374
375   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
376       g_param_spec_string ("temp-template", "Temporary File Template",
377           "File template to store temporary files in, should contain directory "
378           "and XXXXXX. (NULL == disabled)",
379           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
382       g_param_spec_string ("temp-location", "Temporary File Location",
383           "Location to store temporary files in (Only read this property, "
384           "use temp-template to configure the name template)",
385           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
386
387   /**
388    * GstQueue2:temp-remove
389    *
390    * When temp-template is set, remove the temporary file when going to READY.
391    */
392   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
393       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
394           "Remove the temp-location after use",
395           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
396
397   /**
398    * GstQueue2:ring-buffer-max-size
399    *
400    * The maximum size of the ring buffer in bytes. If set to 0, the ring
401    * buffer is disabled. Default 0.
402    */
403   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
404       g_param_spec_uint64 ("ring-buffer-max-size",
405           "Max. ring buffer size (bytes)",
406           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
407           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
408           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409
410   /**
411    * GstQueue2:avg-in-rate
412    *
413    * The average input data rate.
414    */
415   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
416       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
417           "Average input data rate (bytes/s)",
418           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
419
420   /* set several parent class virtual functions */
421   gobject_class->finalize = gst_queue2_finalize;
422
423   gst_element_class_add_pad_template (gstelement_class,
424       gst_static_pad_template_get (&srctemplate));
425   gst_element_class_add_pad_template (gstelement_class,
426       gst_static_pad_template_get (&sinktemplate));
427
428   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
429       "Generic",
430       "Simple data queue",
431       "Erik Walthinsen <omega@cse.ogi.edu>, "
432       "Wim Taymans <wim.taymans@gmail.com>");
433
434   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
435   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
436 }
437
438 static void
439 gst_queue2_init (GstQueue2 * queue)
440 {
441   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
442
443   gst_pad_set_chain_function (queue->sinkpad,
444       GST_DEBUG_FUNCPTR (gst_queue2_chain));
445   gst_pad_set_chain_list_function (queue->sinkpad,
446       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
447   gst_pad_set_activatemode_function (queue->sinkpad,
448       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
449   gst_pad_set_event_function (queue->sinkpad,
450       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
451   gst_pad_set_query_function (queue->sinkpad,
452       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
453   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
454   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
455
456   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
457
458   gst_pad_set_activatemode_function (queue->srcpad,
459       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
460   gst_pad_set_getrange_function (queue->srcpad,
461       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
462   gst_pad_set_event_function (queue->srcpad,
463       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
464   gst_pad_set_query_function (queue->srcpad,
465       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
466   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
467   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
468
469   /* levels */
470   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
471   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
472   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
473   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
474   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
475   queue->use_buffering = DEFAULT_USE_BUFFERING;
476   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
477   queue->low_percent = DEFAULT_LOW_PERCENT;
478   queue->high_percent = DEFAULT_HIGH_PERCENT;
479
480   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
481   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
482
483   queue->sinktime = GST_CLOCK_TIME_NONE;
484   queue->srctime = GST_CLOCK_TIME_NONE;
485   queue->sink_tainted = TRUE;
486   queue->src_tainted = TRUE;
487
488   queue->srcresult = GST_FLOW_FLUSHING;
489   queue->sinkresult = GST_FLOW_FLUSHING;
490   queue->is_eos = FALSE;
491   queue->in_timer = g_timer_new ();
492   queue->out_timer = g_timer_new ();
493
494   g_mutex_init (&queue->qlock);
495   queue->waiting_add = FALSE;
496   g_cond_init (&queue->item_add);
497   queue->waiting_del = FALSE;
498   g_cond_init (&queue->item_del);
499   g_queue_init (&queue->queue);
500
501   g_cond_init (&queue->query_handled);
502   queue->last_query = FALSE;
503
504   g_mutex_init (&queue->buffering_post_lock);
505   queue->buffering_percent = 100;
506
507   /* tempfile related */
508   queue->temp_template = NULL;
509   queue->temp_location = NULL;
510   queue->temp_remove = DEFAULT_TEMP_REMOVE;
511
512   queue->ring_buffer = NULL;
513   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
514
515   GST_DEBUG_OBJECT (queue,
516       "initialized queue's not_empty & not_full conditions");
517 }
518
519 /* called only once, as opposed to dispose */
520 static void
521 gst_queue2_finalize (GObject * object)
522 {
523   GstQueue2 *queue = GST_QUEUE2 (object);
524
525   GST_DEBUG_OBJECT (queue, "finalizing queue");
526
527   while (!g_queue_is_empty (&queue->queue)) {
528     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
529
530     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
531       gst_mini_object_unref (qitem->item);
532     g_slice_free (GstQueue2Item, qitem);
533   }
534
535   queue->last_query = FALSE;
536   g_queue_clear (&queue->queue);
537   g_mutex_clear (&queue->qlock);
538   g_mutex_clear (&queue->buffering_post_lock);
539   g_cond_clear (&queue->item_add);
540   g_cond_clear (&queue->item_del);
541   g_cond_clear (&queue->query_handled);
542   g_timer_destroy (queue->in_timer);
543   g_timer_destroy (queue->out_timer);
544
545   /* temp_file path cleanup  */
546   g_free (queue->temp_template);
547   g_free (queue->temp_location);
548
549   G_OBJECT_CLASS (parent_class)->finalize (object);
550 }
551
552 static void
553 debug_ranges (GstQueue2 * queue)
554 {
555   GstQueue2Range *walk;
556
557   for (walk = queue->ranges; walk; walk = walk->next) {
558     GST_DEBUG_OBJECT (queue,
559         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
560         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
561         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
562         walk->rb_writing_pos, walk->reading_pos,
563         walk == queue->current ? "**y**" : "  n  ");
564   }
565 }
566
567 /* clear all the downloaded ranges */
568 static void
569 clean_ranges (GstQueue2 * queue)
570 {
571   GST_DEBUG_OBJECT (queue, "clean queue ranges");
572
573   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
574   queue->ranges = NULL;
575   queue->current = NULL;
576 }
577
578 /* find a range that contains @offset or NULL when nothing does */
579 static GstQueue2Range *
580 find_range (GstQueue2 * queue, guint64 offset)
581 {
582   GstQueue2Range *range = NULL;
583   GstQueue2Range *walk;
584
585   /* first do a quick check for the current range */
586   for (walk = queue->ranges; walk; walk = walk->next) {
587     if (offset >= walk->offset && offset <= walk->writing_pos) {
588       /* we can reuse an existing range */
589       range = walk;
590       break;
591     }
592   }
593   if (range) {
594     GST_DEBUG_OBJECT (queue,
595         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
596         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
597   } else {
598     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
599   }
600   return range;
601 }
602
603 static void
604 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
605 {
606   guint64 max_reading_pos, writing_pos;
607
608   writing_pos = range->writing_pos;
609   max_reading_pos = range->max_reading_pos;
610
611   if (writing_pos > max_reading_pos)
612     queue->cur_level.bytes = writing_pos - max_reading_pos;
613   else
614     queue->cur_level.bytes = 0;
615 }
616
617 /* make a new range for @offset or reuse an existing range */
618 static GstQueue2Range *
619 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
620 {
621   GstQueue2Range *range, *prev, *next;
622
623   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
624
625   if ((range = find_range (queue, offset))) {
626     GST_DEBUG_OBJECT (queue,
627         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
628         range->writing_pos);
629     if (update_existing && range->writing_pos != offset) {
630       GST_DEBUG_OBJECT (queue, "updating range writing position to "
631           "%" G_GUINT64_FORMAT, offset);
632       range->writing_pos = offset;
633     }
634   } else {
635     GST_DEBUG_OBJECT (queue,
636         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
637
638     range = g_slice_new0 (GstQueue2Range);
639     range->offset = offset;
640     /* we want to write to the next location in the ring buffer */
641     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
642     range->writing_pos = offset;
643     range->rb_writing_pos = range->rb_offset;
644     range->reading_pos = offset;
645     range->max_reading_pos = offset;
646
647     /* insert sorted */
648     prev = NULL;
649     next = queue->ranges;
650     while (next) {
651       if (next->offset > offset) {
652         /* insert before next */
653         GST_DEBUG_OBJECT (queue,
654             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
655             next->offset);
656         break;
657       }
658       /* try next */
659       prev = next;
660       next = next->next;
661     }
662     range->next = next;
663     if (prev)
664       prev->next = range;
665     else
666       queue->ranges = range;
667   }
668   debug_ranges (queue);
669
670   /* update the stats for this range */
671   update_cur_level (queue, range);
672
673   return range;
674 }
675
676
677 /* clear and init the download ranges for offset 0 */
678 static void
679 init_ranges (GstQueue2 * queue)
680 {
681   GST_DEBUG_OBJECT (queue, "init queue ranges");
682
683   /* get rid of all the current ranges */
684   clean_ranges (queue);
685   /* make a range for offset 0 */
686   queue->current = add_range (queue, 0, TRUE);
687 }
688
689 /* calculate the diff between running time on the sink and src of the queue.
690  * This is the total amount of time in the queue. */
691 static void
692 update_time_level (GstQueue2 * queue)
693 {
694   if (queue->sink_tainted) {
695     queue->sinktime =
696         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
697         queue->sink_segment.position);
698     queue->sink_tainted = FALSE;
699   }
700
701   if (queue->src_tainted) {
702     queue->srctime =
703         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
704         queue->src_segment.position);
705     queue->src_tainted = FALSE;
706   }
707
708   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
709       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
710
711   if (queue->sinktime != GST_CLOCK_TIME_NONE
712       && queue->srctime != GST_CLOCK_TIME_NONE
713       && queue->sinktime >= queue->srctime)
714     queue->cur_level.time = queue->sinktime - queue->srctime;
715   else
716     queue->cur_level.time = 0;
717 }
718
719 /* take a SEGMENT event and apply the values to segment, updating the time
720  * level of queue. */
721 static void
722 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
723     gboolean is_sink)
724 {
725   gst_event_copy_segment (event, segment);
726
727   if (segment->format == GST_FORMAT_BYTES) {
728     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
729       /* start is where we'll be getting from and as such writing next */
730       queue->current = add_range (queue, segment->start, TRUE);
731     }
732   }
733
734   /* now configure the values, we use these to track timestamps on the
735    * sinkpad. */
736   if (segment->format != GST_FORMAT_TIME) {
737     /* non-time format, pretend the current time segment is closed with a
738      * 0 start and unknown stop time. */
739     segment->format = GST_FORMAT_TIME;
740     segment->start = 0;
741     segment->stop = -1;
742     segment->time = 0;
743   }
744
745   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
746
747   if (is_sink)
748     queue->sink_tainted = TRUE;
749   else
750     queue->src_tainted = TRUE;
751
752   /* segment can update the time level of the queue */
753   update_time_level (queue);
754 }
755
756 static void
757 apply_gap (GstQueue2 * queue, GstEvent * event,
758     GstSegment * segment, gboolean is_sink)
759 {
760   GstClockTime timestamp;
761   GstClockTime duration;
762
763   gst_event_parse_gap (event, &timestamp, &duration);
764
765   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
766
767     if (GST_CLOCK_TIME_IS_VALID (duration)) {
768       timestamp += duration;
769     }
770
771     segment->position = timestamp;
772
773     if (is_sink)
774       queue->sink_tainted = TRUE;
775     else
776       queue->src_tainted = TRUE;
777
778     /* calc diff with other end */
779     update_time_level (queue);
780   }
781 }
782
783 /* take a buffer and update segment, updating the time level of the queue. */
784 static void
785 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
786     gboolean is_sink)
787 {
788   GstClockTime duration, timestamp;
789
790   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
791   duration = GST_BUFFER_DURATION (buffer);
792
793   /* if no timestamp is set, assume it's continuous with the previous
794    * time */
795   if (timestamp == GST_CLOCK_TIME_NONE)
796     timestamp = segment->position;
797
798   /* add duration */
799   if (duration != GST_CLOCK_TIME_NONE)
800     timestamp += duration;
801
802   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
803       GST_TIME_ARGS (timestamp));
804
805   segment->position = timestamp;
806
807   if (is_sink)
808     queue->sink_tainted = TRUE;
809   else
810     queue->src_tainted = TRUE;
811
812   /* calc diff with other end */
813   update_time_level (queue);
814 }
815
816 static gboolean
817 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
818 {
819   GstClockTime *timestamp = data;
820   GstClockTime btime;
821
822   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
823       " duration %" GST_TIME_FORMAT, idx,
824       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
825       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
826       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
827
828   btime = GST_BUFFER_DTS_OR_PTS (*buf);
829   if (GST_CLOCK_TIME_IS_VALID (btime))
830     *timestamp = btime;
831
832   if (GST_BUFFER_DURATION_IS_VALID (*buf))
833     *timestamp += GST_BUFFER_DURATION (*buf);
834
835   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
836   return TRUE;
837 }
838
839 /* take a buffer list and update segment, updating the time level of the queue */
840 static void
841 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
842     GstSegment * segment, gboolean is_sink)
843 {
844   GstClockTime timestamp;
845
846   /* if no timestamp is set, assume it's continuous with the previous time */
847   timestamp = segment->position;
848
849   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
850
851   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
852       GST_TIME_ARGS (timestamp));
853
854   segment->position = timestamp;
855
856   if (is_sink)
857     queue->sink_tainted = TRUE;
858   else
859     queue->src_tainted = TRUE;
860
861   /* calc diff with other end */
862   update_time_level (queue);
863 }
864
865 static inline gint
866 get_percent (guint64 cur_level, guint64 max_level, guint64 alt_max)
867 {
868   guint64 p;
869
870   if (max_level == 0)
871     return 0;
872
873   if (alt_max > 0)
874     p = gst_util_uint64_scale (cur_level, 100, MIN (max_level, alt_max));
875   else
876     p = gst_util_uint64_scale (cur_level, 100, max_level);
877
878   return MIN (p, 100);
879 }
880
881 static gboolean
882 get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
883     gint * percent)
884 {
885   gint perc;
886
887   if (queue->high_percent <= 0) {
888     if (percent)
889       *percent = 100;
890     if (is_buffering)
891       *is_buffering = FALSE;
892     return FALSE;
893   }
894 #define GET_PERCENT(format,alt_max) \
895     get_percent(queue->cur_level.format,queue->max_level.format,(alt_max))
896
897   if (queue->is_eos) {
898     /* on EOS we are always 100% full, we set the var here so that it we can
899      * reuse the logic below to stop buffering */
900     perc = 100;
901     GST_LOG_OBJECT (queue, "we are EOS");
902   } else {
903     GST_LOG_OBJECT (queue,
904         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
905         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
906         queue->cur_level.buffers);
907
908     /* figure out the percent we are filled, we take the max of all formats. */
909     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
910       perc = GET_PERCENT (bytes, 0);
911     } else {
912       guint64 rb_size = queue->ring_buffer_max_size;
913       perc = GET_PERCENT (bytes, rb_size);
914     }
915     perc = MAX (perc, GET_PERCENT (time, 0));
916     perc = MAX (perc, GET_PERCENT (buffers, 0));
917
918     /* also apply the rate estimate when we need to */
919     if (queue->use_rate_estimate)
920       perc = MAX (perc, GET_PERCENT (rate_time, 0));
921
922     /* Don't get to 0% unless we're really empty */
923     if (queue->cur_level.bytes > 0)
924       perc = MAX (1, perc);
925   }
926 #undef GET_PERCENT
927
928   if (is_buffering)
929     *is_buffering = queue->is_buffering;
930
931   /* scale to high percent so that it becomes the 100% mark */
932   perc = perc * 100 / queue->high_percent;
933   /* clip */
934   if (perc > 100)
935     perc = 100;
936
937   if (percent)
938     *percent = perc;
939
940   GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
941       perc);
942
943   return TRUE;
944 }
945
946 static void
947 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
948     gint * avg_in, gint * avg_out, gint64 * buffering_left)
949 {
950   if (mode) {
951     if (!QUEUE_IS_USING_QUEUE (queue)) {
952       if (QUEUE_IS_USING_RING_BUFFER (queue))
953         *mode = GST_BUFFERING_TIMESHIFT;
954       else
955         *mode = GST_BUFFERING_DOWNLOAD;
956     } else {
957       *mode = GST_BUFFERING_STREAM;
958     }
959   }
960
961   if (avg_in)
962     *avg_in = queue->byte_in_rate;
963   if (avg_out)
964     *avg_out = queue->byte_out_rate;
965
966   if (buffering_left) {
967     *buffering_left = (percent == 100 ? 0 : -1);
968
969     if (queue->use_rate_estimate) {
970       guint64 max, cur;
971
972       max = queue->max_level.rate_time;
973       cur = queue->cur_level.rate_time;
974
975       if (percent != 100 && max > cur)
976         *buffering_left = (max - cur) / 1000000;
977     }
978   }
979 }
980
981 static void
982 gst_queue2_post_buffering (GstQueue2 * queue)
983 {
984   GstMessage *msg = NULL;
985
986   g_mutex_lock (&queue->buffering_post_lock);
987   GST_QUEUE2_MUTEX_LOCK (queue);
988   if (queue->percent_changed) {
989     gint percent = queue->buffering_percent;
990
991     queue->percent_changed = FALSE;
992
993     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
994     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
995
996     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
997         queue->avg_out, queue->buffering_left);
998   }
999   GST_QUEUE2_MUTEX_UNLOCK (queue);
1000
1001   if (msg != NULL)
1002     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1003
1004   g_mutex_unlock (&queue->buffering_post_lock);
1005 }
1006
1007 static void
1008 update_buffering (GstQueue2 * queue)
1009 {
1010   gint percent;
1011
1012   /* Ensure the variables used to calculate buffering state are up-to-date. */
1013   if (queue->current)
1014     update_cur_level (queue, queue->current);
1015   update_in_rates (queue);
1016
1017   if (!get_buffering_percent (queue, NULL, &percent))
1018     return;
1019
1020   if (queue->is_buffering) {
1021     /* if we were buffering see if we reached the high watermark */
1022     if (percent >= 100)
1023       queue->is_buffering = FALSE;
1024
1025     SET_PERCENT (queue, percent);
1026   } else {
1027     /* we were not buffering, check if we need to start buffering if we drop
1028      * below the low threshold */
1029     if (percent < queue->low_percent) {
1030       queue->is_buffering = TRUE;
1031       SET_PERCENT (queue, percent);
1032     }
1033   }
1034 }
1035
1036 static void
1037 reset_rate_timer (GstQueue2 * queue)
1038 {
1039   queue->bytes_in = 0;
1040   queue->bytes_out = 0;
1041   queue->byte_in_rate = 0.0;
1042   queue->byte_in_period = 0;
1043   queue->byte_out_rate = 0.0;
1044   queue->last_update_in_rates_elapsed = 0.0;
1045   queue->last_in_elapsed = 0.0;
1046   queue->last_out_elapsed = 0.0;
1047   queue->in_timer_started = FALSE;
1048   queue->out_timer_started = FALSE;
1049 }
1050
1051 /* the interval in seconds to recalculate the rate */
1052 #define RATE_INTERVAL    0.2
1053 /* Tuning for rate estimation. We use a large window for the input rate because
1054  * it should be stable when connected to a network. The output rate is less
1055  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1056  * therefore adapt more quickly.
1057  * However, initial input rate may be subject to a burst, and should therefore
1058  * initially also adapt more quickly to changes, and only later on give higher
1059  * weight to previous values. */
1060 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1061 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1062
1063 static void
1064 update_in_rates (GstQueue2 * queue)
1065 {
1066   gdouble elapsed, period;
1067   gdouble byte_in_rate;
1068
1069   if (!queue->in_timer_started) {
1070     queue->in_timer_started = TRUE;
1071     g_timer_start (queue->in_timer);
1072     return;
1073   }
1074
1075   queue->last_update_in_rates_elapsed = elapsed =
1076       g_timer_elapsed (queue->in_timer, NULL);
1077
1078   /* recalc after each interval. */
1079   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1080     period = elapsed - queue->last_in_elapsed;
1081
1082     GST_DEBUG_OBJECT (queue,
1083         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1084         period, queue->bytes_in, queue->byte_in_period);
1085
1086     byte_in_rate = queue->bytes_in / period;
1087
1088     if (queue->byte_in_rate == 0.0)
1089       queue->byte_in_rate = byte_in_rate;
1090     else
1091       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1092           (double) queue->byte_in_period, period);
1093
1094     /* another data point, cap at 16 for long time running average */
1095     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1096       queue->byte_in_period += period;
1097
1098     /* reset the values to calculate rate over the next interval */
1099     queue->last_in_elapsed = elapsed;
1100     queue->bytes_in = 0;
1101   }
1102
1103   if (queue->byte_in_rate > 0.0) {
1104     queue->cur_level.rate_time =
1105         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1106   }
1107   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1108       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1109 }
1110
1111 static void
1112 update_out_rates (GstQueue2 * queue)
1113 {
1114   gdouble elapsed, period;
1115   gdouble byte_out_rate;
1116
1117   if (!queue->out_timer_started) {
1118     queue->out_timer_started = TRUE;
1119     g_timer_start (queue->out_timer);
1120     return;
1121   }
1122
1123   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1124
1125   /* recalc after each interval. */
1126   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1127     period = elapsed - queue->last_out_elapsed;
1128
1129     GST_DEBUG_OBJECT (queue,
1130         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1131
1132     byte_out_rate = queue->bytes_out / period;
1133
1134     if (queue->byte_out_rate == 0.0)
1135       queue->byte_out_rate = byte_out_rate;
1136     else
1137       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1138
1139     /* reset the values to calculate rate over the next interval */
1140     queue->last_out_elapsed = elapsed;
1141     queue->bytes_out = 0;
1142   }
1143   if (queue->byte_in_rate > 0.0) {
1144     queue->cur_level.rate_time =
1145         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1146   }
1147   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1148       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1149 }
1150
1151 static void
1152 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1153 {
1154   guint64 reading_pos, max_reading_pos;
1155
1156   reading_pos = pos;
1157   max_reading_pos = range->max_reading_pos;
1158
1159   max_reading_pos = MAX (max_reading_pos, reading_pos);
1160
1161   GST_DEBUG_OBJECT (queue,
1162       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1163       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1164   range->max_reading_pos = max_reading_pos;
1165
1166   update_cur_level (queue, range);
1167 }
1168
1169 static gboolean
1170 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1171 {
1172   GstEvent *event;
1173   gboolean res;
1174
1175   /* until we receive the FLUSH_STOP from this seek, we skip data */
1176   queue->seeking = TRUE;
1177   GST_QUEUE2_MUTEX_UNLOCK (queue);
1178
1179   debug_ranges (queue);
1180
1181   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1182
1183   event =
1184       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1185       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1186       GST_SEEK_TYPE_NONE, -1);
1187
1188   res = gst_pad_push_event (queue->sinkpad, event);
1189   GST_QUEUE2_MUTEX_LOCK (queue);
1190
1191   if (res) {
1192     /* Between us sending the seek event and re-acquiring the lock, the source
1193      * thread might already have pushed data and moved along the range's
1194      * writing_pos beyond the seek offset. In that case we don't want to set
1195      * the writing position back to the requested seek position, as it would
1196      * cause data to be written to the wrong offset in the file or ring buffer.
1197      * We still do the add_range call to switch the current range to the
1198      * requested range, or create one if one doesn't exist yet. */
1199     queue->current = add_range (queue, offset, FALSE);
1200   }
1201
1202   return res;
1203 }
1204
1205 /* get the threshold for when we decide to seek rather than wait */
1206 static guint64
1207 get_seek_threshold (GstQueue2 * queue)
1208 {
1209   guint64 threshold;
1210
1211   /* FIXME, find a good threshold based on the incoming rate. */
1212   threshold = 1024 * 512;
1213
1214   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1215     threshold = MIN (threshold,
1216         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1217   }
1218   return threshold;
1219 }
1220
1221 /* see if there is enough data in the file to read a full buffer */
1222 static gboolean
1223 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1224 {
1225   GstQueue2Range *range;
1226
1227   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1228       offset, length);
1229
1230   if ((range = find_range (queue, offset))) {
1231     if (queue->current != range) {
1232       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1233       perform_seek_to_offset (queue, range->writing_pos);
1234     }
1235
1236     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1237         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1238
1239     /* we have a range for offset */
1240     GST_DEBUG_OBJECT (queue,
1241         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1242         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1243
1244     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1245       return TRUE;
1246
1247     if (offset + length <= range->writing_pos)
1248       return TRUE;
1249     else
1250       GST_DEBUG_OBJECT (queue,
1251           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1252           (offset + length) - range->writing_pos);
1253
1254   } else {
1255     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1256         " len %u", offset, length);
1257     /* we don't have the range, see how far away we are */
1258     if (!queue->is_eos && queue->current) {
1259       guint64 threshold = get_seek_threshold (queue);
1260
1261       if (offset >= queue->current->offset && offset <=
1262           queue->current->writing_pos + threshold) {
1263         GST_INFO_OBJECT (queue,
1264             "requested data is within range, wait for data");
1265         return FALSE;
1266       }
1267     }
1268
1269     /* too far away, do a seek */
1270     perform_seek_to_offset (queue, offset);
1271   }
1272
1273   return FALSE;
1274 }
1275
1276 #ifdef HAVE_FSEEKO
1277 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1278 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1279 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1280 #else
1281 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1282 #endif
1283
1284 static GstFlowReturn
1285 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1286     guint8 * dst, gint64 * read_return)
1287 {
1288   guint8 *ring_buffer;
1289   size_t res;
1290
1291   ring_buffer = queue->ring_buffer;
1292
1293   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1294     goto seek_failed;
1295
1296   /* this should not block */
1297   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1298       length, offset);
1299   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1300     res = fread (dst, 1, length, queue->temp_file);
1301   } else {
1302     memcpy (dst, ring_buffer + offset, length);
1303     res = length;
1304   }
1305
1306   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1307
1308   if (G_UNLIKELY (res < length)) {
1309     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1310       goto could_not_read;
1311     /* check for errors or EOF */
1312     if (ferror (queue->temp_file))
1313       goto could_not_read;
1314     if (feof (queue->temp_file) && length > 0)
1315       goto eos;
1316   }
1317
1318   *read_return = res;
1319
1320   return GST_FLOW_OK;
1321
1322 seek_failed:
1323   {
1324     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1325     return GST_FLOW_ERROR;
1326   }
1327 could_not_read:
1328   {
1329     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1330     return GST_FLOW_ERROR;
1331   }
1332 eos:
1333   {
1334     GST_DEBUG ("non-regular file hits EOS");
1335     return GST_FLOW_EOS;
1336   }
1337 }
1338
1339 static GstFlowReturn
1340 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1341     GstBuffer ** buffer)
1342 {
1343   GstBuffer *buf;
1344   GstMapInfo info;
1345   guint8 *data;
1346   guint64 file_offset;
1347   guint block_length, remaining, read_length;
1348   guint64 rb_size;
1349   guint64 max_size;
1350   guint64 rpos;
1351   GstFlowReturn ret = GST_FLOW_OK;
1352
1353   /* allocate the output buffer of the requested size */
1354   if (*buffer == NULL)
1355     buf = gst_buffer_new_allocate (NULL, length, NULL);
1356   else
1357     buf = *buffer;
1358
1359   gst_buffer_map (buf, &info, GST_MAP_WRITE);
1360   data = info.data;
1361
1362   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1363       offset);
1364
1365   rpos = offset;
1366   rb_size = queue->ring_buffer_max_size;
1367   max_size = QUEUE_MAX_BYTES (queue);
1368
1369   remaining = length;
1370   while (remaining > 0) {
1371     /* configure how much/whether to read */
1372     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1373       read_length = 0;
1374
1375       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1376         guint64 level;
1377
1378         /* calculate how far away the offset is */
1379         if (queue->current->writing_pos > rpos)
1380           level = queue->current->writing_pos - rpos;
1381         else
1382           level = 0;
1383
1384         GST_DEBUG_OBJECT (queue,
1385             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1386             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1387             rpos, queue->current->writing_pos, level, max_size);
1388
1389         if (level >= max_size) {
1390           /* we don't have the data but if we have a ring buffer that is full, we
1391            * need to read */
1392           GST_DEBUG_OBJECT (queue,
1393               "ring buffer full, reading QUEUE_MAX_BYTES %"
1394               G_GUINT64_FORMAT " bytes", max_size);
1395           read_length = max_size;
1396         } else if (queue->is_eos) {
1397           /* won't get any more data so read any data we have */
1398           if (level) {
1399             GST_DEBUG_OBJECT (queue,
1400                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1401                 level);
1402             read_length = level;
1403             remaining = level;
1404             length = level;
1405           } else
1406             goto hit_eos;
1407         }
1408       }
1409
1410       if (read_length == 0) {
1411         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1412           GST_DEBUG_OBJECT (queue,
1413               "update current position [%" G_GUINT64_FORMAT "-%"
1414               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1415           update_cur_pos (queue, queue->current, rpos);
1416           GST_QUEUE2_SIGNAL_DEL (queue);
1417         }
1418
1419         if (queue->use_buffering)
1420           update_buffering (queue);
1421
1422         GST_DEBUG_OBJECT (queue, "waiting for add");
1423         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1424         continue;
1425       }
1426     } else {
1427       /* we have the requested data so read it */
1428       read_length = remaining;
1429     }
1430
1431     /* set range reading_pos to actual reading position for this read */
1432     queue->current->reading_pos = rpos;
1433
1434     /* configure how much and from where to read */
1435     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1436       file_offset =
1437           (queue->current->rb_offset + (rpos -
1438               queue->current->offset)) % rb_size;
1439       if (file_offset + read_length > rb_size) {
1440         block_length = rb_size - file_offset;
1441       } else {
1442         block_length = read_length;
1443       }
1444     } else {
1445       file_offset = rpos;
1446       block_length = read_length;
1447     }
1448
1449     /* while we still have data to read, we loop */
1450     while (read_length > 0) {
1451       gint64 read_return;
1452
1453       ret =
1454           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1455           data, &read_return);
1456       if (ret != GST_FLOW_OK)
1457         goto read_error;
1458
1459       file_offset += read_return;
1460       if (QUEUE_IS_USING_RING_BUFFER (queue))
1461         file_offset %= rb_size;
1462
1463       data += read_return;
1464       read_length -= read_return;
1465       block_length = read_length;
1466       remaining -= read_return;
1467
1468       rpos = (queue->current->reading_pos += read_return);
1469       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1470     }
1471     GST_QUEUE2_SIGNAL_DEL (queue);
1472     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1473   }
1474
1475   gst_buffer_unmap (buf, &info);
1476   gst_buffer_resize (buf, 0, length);
1477
1478   GST_BUFFER_OFFSET (buf) = offset;
1479   GST_BUFFER_OFFSET_END (buf) = offset + length;
1480
1481   *buffer = buf;
1482
1483   return ret;
1484
1485   /* ERRORS */
1486 hit_eos:
1487   {
1488     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1489     gst_buffer_unmap (buf, &info);
1490     if (*buffer == NULL)
1491       gst_buffer_unref (buf);
1492     return GST_FLOW_EOS;
1493   }
1494 out_flushing:
1495   {
1496     GST_DEBUG_OBJECT (queue, "we are flushing");
1497     gst_buffer_unmap (buf, &info);
1498     if (*buffer == NULL)
1499       gst_buffer_unref (buf);
1500     return GST_FLOW_FLUSHING;
1501   }
1502 read_error:
1503   {
1504     GST_DEBUG_OBJECT (queue, "we have a read error");
1505     gst_buffer_unmap (buf, &info);
1506     if (*buffer == NULL)
1507       gst_buffer_unref (buf);
1508     return ret;
1509   }
1510 }
1511
1512 /* should be called with QUEUE_LOCK */
1513 static GstMiniObject *
1514 gst_queue2_read_item_from_file (GstQueue2 * queue)
1515 {
1516   GstMiniObject *item;
1517
1518   if (queue->stream_start_event != NULL) {
1519     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1520     queue->stream_start_event = NULL;
1521   } else if (queue->starting_segment != NULL) {
1522     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1523     queue->starting_segment = NULL;
1524   } else {
1525     GstFlowReturn ret;
1526     GstBuffer *buffer = NULL;
1527     guint64 reading_pos;
1528
1529     reading_pos = queue->current->reading_pos;
1530
1531     ret =
1532         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1533         &buffer);
1534
1535     switch (ret) {
1536       case GST_FLOW_OK:
1537         item = GST_MINI_OBJECT_CAST (buffer);
1538         break;
1539       case GST_FLOW_EOS:
1540         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1541         break;
1542       default:
1543         item = NULL;
1544         break;
1545     }
1546   }
1547   return item;
1548 }
1549
1550 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1551  * the temp filename. */
1552 static gboolean
1553 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1554 {
1555   gint fd = -1;
1556   gchar *name = NULL;
1557
1558   if (queue->temp_file)
1559     goto already_opened;
1560
1561   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1562
1563   /* If temp_template was set, allocate a filename and open that file */
1564
1565   /* nothing to do */
1566   if (queue->temp_template == NULL)
1567     goto no_directory;
1568
1569   /* make copy of the template, we don't want to change this */
1570   name = g_strdup (queue->temp_template);
1571
1572 #ifdef __BIONIC__
1573   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1574 #else
1575   fd = g_mkstemp (name);
1576 #endif
1577
1578   if (fd == -1)
1579     goto mkstemp_failed;
1580
1581   /* open the file for update/writing */
1582   queue->temp_file = fdopen (fd, "wb+");
1583   /* error creating file */
1584   if (queue->temp_file == NULL)
1585     goto open_failed;
1586
1587   g_free (queue->temp_location);
1588   queue->temp_location = name;
1589
1590   GST_QUEUE2_MUTEX_UNLOCK (queue);
1591
1592   /* we can't emit the notify with the lock */
1593   g_object_notify (G_OBJECT (queue), "temp-location");
1594
1595   GST_QUEUE2_MUTEX_LOCK (queue);
1596
1597   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1598
1599   return TRUE;
1600
1601   /* ERRORS */
1602 already_opened:
1603   {
1604     GST_DEBUG_OBJECT (queue, "temp file was already open");
1605     return TRUE;
1606   }
1607 no_directory:
1608   {
1609     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1610         (_("No Temp directory specified.")), (NULL));
1611     return FALSE;
1612   }
1613 mkstemp_failed:
1614   {
1615     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1616         (_("Could not create temp file \"%s\"."), queue->temp_template),
1617         GST_ERROR_SYSTEM);
1618     g_free (name);
1619     return FALSE;
1620   }
1621 open_failed:
1622   {
1623     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1624         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1625     g_free (name);
1626     if (fd != -1)
1627       close (fd);
1628     return FALSE;
1629   }
1630 }
1631
1632 static void
1633 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1634 {
1635   /* nothing to do */
1636   if (queue->temp_file == NULL)
1637     return;
1638
1639   GST_DEBUG_OBJECT (queue, "closing temp file");
1640
1641   fflush (queue->temp_file);
1642   fclose (queue->temp_file);
1643
1644   if (queue->temp_remove) {
1645     if (remove (queue->temp_location) < 0) {
1646       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1647           queue->temp_location, g_strerror (errno));
1648     }
1649   }
1650
1651   queue->temp_file = NULL;
1652   clean_ranges (queue);
1653 }
1654
1655 static void
1656 gst_queue2_flush_temp_file (GstQueue2 * queue)
1657 {
1658   if (queue->temp_file == NULL)
1659     return;
1660
1661   GST_DEBUG_OBJECT (queue, "flushing temp file");
1662
1663   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1664 }
1665
1666 static void
1667 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1668 {
1669   if (!QUEUE_IS_USING_QUEUE (queue)) {
1670     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1671       gst_queue2_flush_temp_file (queue);
1672     init_ranges (queue);
1673   } else {
1674     while (!g_queue_is_empty (&queue->queue)) {
1675       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1676
1677       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1678           && GST_EVENT_IS_STICKY (qitem->item)
1679           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1680           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1681         gst_pad_store_sticky_event (queue->srcpad,
1682             GST_EVENT_CAST (qitem->item));
1683       }
1684
1685       /* Then lose another reference because we are supposed to destroy that
1686          data when flushing */
1687       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1688         gst_mini_object_unref (qitem->item);
1689       g_slice_free (GstQueue2Item, qitem);
1690     }
1691   }
1692   queue->last_query = FALSE;
1693   g_cond_signal (&queue->query_handled);
1694   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1695   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1696   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1697   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1698   queue->sink_tainted = queue->src_tainted = TRUE;
1699   if (queue->starting_segment != NULL)
1700     gst_event_unref (queue->starting_segment);
1701   queue->starting_segment = NULL;
1702   queue->segment_event_received = FALSE;
1703   gst_event_replace (&queue->stream_start_event, NULL);
1704
1705   /* we deleted a lot of something */
1706   GST_QUEUE2_SIGNAL_DEL (queue);
1707 }
1708
1709 static gboolean
1710 gst_queue2_wait_free_space (GstQueue2 * queue)
1711 {
1712   /* We make space available if we're "full" according to whatever
1713    * the user defined as "full". */
1714   if (gst_queue2_is_filled (queue)) {
1715     gboolean started;
1716
1717     /* pause the timer while we wait. The fact that we are waiting does not mean
1718      * the byterate on the input pad is lower */
1719     if ((started = queue->in_timer_started))
1720       g_timer_stop (queue->in_timer);
1721
1722     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1723         "queue is full, waiting for free space");
1724     do {
1725       GST_QUEUE2_MUTEX_UNLOCK (queue);
1726       g_signal_emit (queue, gst_queue2_signals[SIGNAL_OVERRUN], 0);
1727       GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
1728       /* we recheck, the signal could have changed the thresholds */
1729       if (!gst_queue2_is_filled (queue))
1730         break;
1731
1732       /* Wait for space to be available, we could be unlocked because of a flush. */
1733       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1734     }
1735     while (gst_queue2_is_filled (queue));
1736
1737     /* and continue if we were running before */
1738     if (started)
1739       g_timer_continue (queue->in_timer);
1740   }
1741   return TRUE;
1742
1743   /* ERRORS */
1744 out_flushing:
1745   {
1746     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1747     return FALSE;
1748   }
1749 }
1750
1751 static gboolean
1752 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1753 {
1754   GstMapInfo info;
1755   guint8 *data, *ring_buffer;
1756   guint size, rb_size;
1757   guint64 writing_pos, new_writing_pos;
1758   GstQueue2Range *range, *prev, *next;
1759   gboolean do_seek = FALSE;
1760
1761   if (QUEUE_IS_USING_RING_BUFFER (queue))
1762     writing_pos = queue->current->rb_writing_pos;
1763   else
1764     writing_pos = queue->current->writing_pos;
1765   ring_buffer = queue->ring_buffer;
1766   rb_size = queue->ring_buffer_max_size;
1767
1768   gst_buffer_map (buffer, &info, GST_MAP_READ);
1769
1770   size = info.size;
1771   data = info.data;
1772
1773   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1774       writing_pos);
1775
1776   /* sanity check */
1777   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1778       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1779     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1780         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1781         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1782   }
1783
1784   while (size > 0) {
1785     guint to_write;
1786
1787     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1788       gint64 space;
1789
1790       /* calculate the space in the ring buffer not used by data from
1791        * the current range */
1792       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1793         /* wait until there is some free space */
1794         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1795       }
1796       /* get the amount of space we have */
1797       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1798
1799       /* calculate if we need to split or if we can write the entire
1800        * buffer now */
1801       to_write = MIN (size, space);
1802
1803       /* the writing position in the ring buffer after writing (part
1804        * or all of) the buffer */
1805       new_writing_pos = (writing_pos + to_write) % rb_size;
1806
1807       prev = NULL;
1808       range = queue->ranges;
1809
1810       /* if we need to overwrite data in the ring buffer, we need to
1811        * update the ranges
1812        *
1813        * warning: this code is complicated and includes some
1814        * simplifications - pen, paper and diagrams for the cases
1815        * recommended! */
1816       while (range) {
1817         guint64 range_data_start, range_data_end;
1818         GstQueue2Range *range_to_destroy = NULL;
1819
1820         range_data_start = range->rb_offset;
1821         range_data_end = range->rb_writing_pos;
1822
1823         /* handle the special case where the range has no data in it */
1824         if (range->writing_pos == range->offset) {
1825           if (range != queue->current) {
1826             GST_DEBUG_OBJECT (queue,
1827                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1828                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1829             /* remove range */
1830             range_to_destroy = range;
1831             if (prev)
1832               prev->next = range->next;
1833           }
1834           goto next_range;
1835         }
1836
1837         if (range_data_end > range_data_start) {
1838           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1839             goto next_range;
1840
1841           if (new_writing_pos > range_data_start) {
1842             if (new_writing_pos >= range_data_end) {
1843               GST_DEBUG_OBJECT (queue,
1844                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1845                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1846               /* remove range */
1847               range_to_destroy = range;
1848               if (prev)
1849                 prev->next = range->next;
1850             } else {
1851               GST_DEBUG_OBJECT (queue,
1852                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1853                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1854                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1855                   range->offset + new_writing_pos - range_data_start,
1856                   new_writing_pos);
1857               range->offset += (new_writing_pos - range_data_start);
1858               range->rb_offset = new_writing_pos;
1859             }
1860           }
1861         } else {
1862           guint64 new_wpos_virt = writing_pos + to_write;
1863
1864           if (new_wpos_virt <= range_data_start)
1865             goto next_range;
1866
1867           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1868             GST_DEBUG_OBJECT (queue,
1869                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1870                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1871             /* remove range */
1872             range_to_destroy = range;
1873             if (prev)
1874               prev->next = range->next;
1875           } else {
1876             GST_DEBUG_OBJECT (queue,
1877                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1878                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1879                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1880                 range->offset + new_writing_pos - range_data_start,
1881                 new_writing_pos);
1882             range->offset += (new_wpos_virt - range_data_start);
1883             range->rb_offset = new_writing_pos;
1884           }
1885         }
1886
1887       next_range:
1888         if (!range_to_destroy)
1889           prev = range;
1890
1891         range = range->next;
1892         if (range_to_destroy) {
1893           if (range_to_destroy == queue->ranges)
1894             queue->ranges = range;
1895           g_slice_free (GstQueue2Range, range_to_destroy);
1896           range_to_destroy = NULL;
1897         }
1898       }
1899     } else {
1900       to_write = size;
1901       new_writing_pos = writing_pos + to_write;
1902     }
1903
1904     if (QUEUE_IS_USING_TEMP_FILE (queue)
1905         && FSEEK_FILE (queue->temp_file, writing_pos))
1906       goto seek_failed;
1907
1908     if (new_writing_pos > writing_pos) {
1909       GST_INFO_OBJECT (queue,
1910           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1911           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1912           queue->current->writing_pos, queue->current->rb_writing_pos);
1913       /* either not using ring buffer or no wrapping, just write */
1914       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1915         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1916           goto handle_error;
1917       } else {
1918         memcpy (ring_buffer + writing_pos, data, to_write);
1919       }
1920
1921       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1922         /* try to merge with next range */
1923         while ((next = queue->current->next)) {
1924           GST_INFO_OBJECT (queue,
1925               "checking merge with next range %" G_GUINT64_FORMAT " < %"
1926               G_GUINT64_FORMAT, new_writing_pos, next->offset);
1927           if (new_writing_pos < next->offset)
1928             break;
1929
1930           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1931               next->writing_pos);
1932
1933           /* remove the group */
1934           queue->current->next = next->next;
1935
1936           /* We use the threshold to decide if we want to do a seek or simply
1937            * read the data again. If there is not so much data in the range we
1938            * prefer to avoid to seek and read it again. */
1939           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
1940             /* the new range had more data than the threshold, it's worth keeping
1941              * it and doing a seek. */
1942             new_writing_pos = next->writing_pos;
1943             do_seek = TRUE;
1944           }
1945           g_slice_free (GstQueue2Range, next);
1946         }
1947         goto update_and_signal;
1948       }
1949     } else {
1950       /* wrapping */
1951       guint block_one, block_two;
1952
1953       block_one = rb_size - writing_pos;
1954       block_two = to_write - block_one;
1955
1956       if (block_one > 0) {
1957         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
1958         /* write data to end of ring buffer */
1959         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1960           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1961             goto handle_error;
1962         } else {
1963           memcpy (ring_buffer + writing_pos, data, block_one);
1964         }
1965       }
1966
1967       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
1968         goto seek_failed;
1969
1970       if (block_two > 0) {
1971         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
1972         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1973           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
1974             goto handle_error;
1975         } else {
1976           memcpy (ring_buffer, data + block_one, block_two);
1977         }
1978       }
1979     }
1980
1981   update_and_signal:
1982     /* update the writing positions */
1983     size -= to_write;
1984     GST_INFO_OBJECT (queue,
1985         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
1986         to_write, writing_pos, size);
1987
1988     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1989       data += to_write;
1990       queue->current->writing_pos += to_write;
1991       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1992     } else {
1993       queue->current->writing_pos = writing_pos = new_writing_pos;
1994     }
1995     if (do_seek)
1996       perform_seek_to_offset (queue, new_writing_pos);
1997
1998     update_cur_level (queue, queue->current);
1999
2000     /* update the buffering status */
2001     if (queue->use_buffering)
2002       update_buffering (queue);
2003
2004     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2005         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2006
2007     GST_QUEUE2_SIGNAL_ADD (queue);
2008   }
2009
2010   gst_buffer_unmap (buffer, &info);
2011
2012   return TRUE;
2013
2014   /* ERRORS */
2015 out_flushing:
2016   {
2017     GST_DEBUG_OBJECT (queue, "we are flushing");
2018     gst_buffer_unmap (buffer, &info);
2019     /* FIXME - GST_FLOW_EOS ? */
2020     return FALSE;
2021   }
2022 seek_failed:
2023   {
2024     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2025     gst_buffer_unmap (buffer, &info);
2026     return FALSE;
2027   }
2028 handle_error:
2029   {
2030     switch (errno) {
2031       case ENOSPC:{
2032         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2033         break;
2034       }
2035       default:{
2036         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2037             (_("Error while writing to download file.")),
2038             ("%s", g_strerror (errno)));
2039       }
2040     }
2041     gst_buffer_unmap (buffer, &info);
2042     return FALSE;
2043   }
2044 }
2045
2046 static gboolean
2047 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2048 {
2049   GstQueue2 *queue = q;
2050
2051   GST_TRACE_OBJECT (queue,
2052       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2053       gst_buffer_get_size (*buf));
2054
2055   if (!gst_queue2_create_write (queue, *buf)) {
2056     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2057     return FALSE;
2058   }
2059   return TRUE;
2060 }
2061
2062 static gboolean
2063 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2064 {
2065   guint *p_size = data;
2066   gsize buf_size;
2067
2068   buf_size = gst_buffer_get_size (*buf);
2069   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2070   *p_size += buf_size;
2071   return TRUE;
2072 }
2073
2074 /* enqueue an item an update the level stats */
2075 static void
2076 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2077     GstQueue2ItemType item_type)
2078 {
2079   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2080     GstBuffer *buffer;
2081     guint size;
2082
2083     buffer = GST_BUFFER_CAST (item);
2084     size = gst_buffer_get_size (buffer);
2085
2086     /* add buffer to the statistics */
2087     if (QUEUE_IS_USING_QUEUE (queue)) {
2088       queue->cur_level.buffers++;
2089       queue->cur_level.bytes += size;
2090     }
2091     queue->bytes_in += size;
2092
2093     /* apply new buffer to segment stats */
2094     apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
2095     /* update the byterate stats */
2096     update_in_rates (queue);
2097
2098     if (!QUEUE_IS_USING_QUEUE (queue)) {
2099       /* FIXME - check return value? */
2100       gst_queue2_create_write (queue, buffer);
2101     }
2102   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2103     GstBufferList *buffer_list;
2104     guint size = 0;
2105
2106     buffer_list = GST_BUFFER_LIST_CAST (item);
2107
2108     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2109     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2110
2111     /* add buffer to the statistics */
2112     if (QUEUE_IS_USING_QUEUE (queue)) {
2113       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2114       queue->cur_level.bytes += size;
2115     }
2116     queue->bytes_in += size;
2117
2118     /* apply new buffer to segment stats */
2119     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2120
2121     /* update the byterate stats */
2122     update_in_rates (queue);
2123
2124     if (!QUEUE_IS_USING_QUEUE (queue)) {
2125       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2126     }
2127   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2128     GstEvent *event;
2129
2130     event = GST_EVENT_CAST (item);
2131
2132     switch (GST_EVENT_TYPE (event)) {
2133       case GST_EVENT_EOS:
2134         /* Zero the thresholds, this makes sure the queue is completely
2135          * filled and we can read all data from the queue. */
2136         GST_DEBUG_OBJECT (queue, "we have EOS");
2137         queue->is_eos = TRUE;
2138         break;
2139       case GST_EVENT_SEGMENT:
2140         apply_segment (queue, event, &queue->sink_segment, TRUE);
2141         /* This is our first new segment, we hold it
2142          * as we can't save it on the temp file */
2143         if (!QUEUE_IS_USING_QUEUE (queue)) {
2144           if (queue->segment_event_received)
2145             goto unexpected_event;
2146
2147           queue->segment_event_received = TRUE;
2148           if (queue->starting_segment != NULL)
2149             gst_event_unref (queue->starting_segment);
2150           queue->starting_segment = event;
2151           item = NULL;
2152         }
2153         /* a new segment allows us to accept more buffers if we got EOS
2154          * from downstream */
2155         queue->unexpected = FALSE;
2156         break;
2157       case GST_EVENT_GAP:
2158         apply_gap (queue, event, &queue->sink_segment, TRUE);
2159         break;
2160       case GST_EVENT_STREAM_START:
2161         if (!QUEUE_IS_USING_QUEUE (queue)) {
2162           gst_event_replace (&queue->stream_start_event, event);
2163           gst_event_unref (event);
2164           item = NULL;
2165         }
2166         break;
2167       case GST_EVENT_CAPS:{
2168         GstCaps *caps;
2169
2170         gst_event_parse_caps (event, &caps);
2171         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2172
2173         if (!QUEUE_IS_USING_QUEUE (queue)) {
2174           GST_LOG ("Dropping caps event, not using queue");
2175           gst_event_unref (event);
2176           item = NULL;
2177         }
2178         break;
2179       }
2180       default:
2181         if (!QUEUE_IS_USING_QUEUE (queue))
2182           goto unexpected_event;
2183         break;
2184     }
2185   } else if (GST_IS_QUERY (item)) {
2186     /* Can't happen as we check that in the caller */
2187     if (!QUEUE_IS_USING_QUEUE (queue))
2188       g_assert_not_reached ();
2189   } else {
2190     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2191         item, GST_OBJECT_NAME (queue));
2192     /* we can't really unref since we don't know what it is */
2193     item = NULL;
2194   }
2195
2196   if (item) {
2197     /* update the buffering status */
2198     if (queue->use_buffering)
2199       update_buffering (queue);
2200
2201     if (QUEUE_IS_USING_QUEUE (queue)) {
2202       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2203       qitem->type = item_type;
2204       qitem->item = item;
2205       g_queue_push_tail (&queue->queue, qitem);
2206     } else {
2207       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2208     }
2209
2210     GST_QUEUE2_SIGNAL_ADD (queue);
2211   }
2212
2213   return;
2214
2215   /* ERRORS */
2216 unexpected_event:
2217   {
2218     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2219
2220     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2221         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2222         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2223     gst_event_unref (GST_EVENT_CAST (item));
2224     return;
2225   }
2226 }
2227
2228 /* dequeue an item from the queue and update level stats */
2229 static GstMiniObject *
2230 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2231 {
2232   GstMiniObject *item;
2233
2234   if (!QUEUE_IS_USING_QUEUE (queue)) {
2235     item = gst_queue2_read_item_from_file (queue);
2236   } else {
2237     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2238
2239     if (qitem == NULL)
2240       goto no_item;
2241
2242     item = qitem->item;
2243     g_slice_free (GstQueue2Item, qitem);
2244   }
2245
2246   if (item == NULL)
2247     goto no_item;
2248
2249   if (GST_IS_BUFFER (item)) {
2250     GstBuffer *buffer;
2251     guint size;
2252
2253     buffer = GST_BUFFER_CAST (item);
2254     size = gst_buffer_get_size (buffer);
2255     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2256
2257     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2258         "retrieved buffer %p from queue", buffer);
2259
2260     if (QUEUE_IS_USING_QUEUE (queue)) {
2261       queue->cur_level.buffers--;
2262       queue->cur_level.bytes -= size;
2263     }
2264     queue->bytes_out += size;
2265
2266     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
2267     /* update the byterate stats */
2268     update_out_rates (queue);
2269     /* update the buffering */
2270     if (queue->use_buffering)
2271       update_buffering (queue);
2272
2273   } else if (GST_IS_EVENT (item)) {
2274     GstEvent *event = GST_EVENT_CAST (item);
2275
2276     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2277
2278     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2279         "retrieved event %p from queue", event);
2280
2281     switch (GST_EVENT_TYPE (event)) {
2282       case GST_EVENT_EOS:
2283         /* queue is empty now that we dequeued the EOS */
2284         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2285         break;
2286       case GST_EVENT_SEGMENT:
2287         apply_segment (queue, event, &queue->src_segment, FALSE);
2288         break;
2289       case GST_EVENT_GAP:
2290         apply_gap (queue, event, &queue->src_segment, FALSE);
2291         break;
2292       default:
2293         break;
2294     }
2295   } else if (GST_IS_BUFFER_LIST (item)) {
2296     GstBufferList *buffer_list;
2297     guint size = 0;
2298
2299     buffer_list = GST_BUFFER_LIST_CAST (item);
2300     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2301     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2302
2303     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2304         "retrieved buffer list %p from queue", buffer_list);
2305
2306     if (QUEUE_IS_USING_QUEUE (queue)) {
2307       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2308       queue->cur_level.bytes -= size;
2309     }
2310     queue->bytes_out += size;
2311
2312     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2313     /* update the byterate stats */
2314     update_out_rates (queue);
2315     /* update the buffering */
2316     if (queue->use_buffering)
2317       update_buffering (queue);
2318   } else if (GST_IS_QUERY (item)) {
2319     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2320         "retrieved query %p from queue", item);
2321     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2322   } else {
2323     g_warning
2324         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2325         item, GST_OBJECT_NAME (queue));
2326     item = NULL;
2327     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2328   }
2329   GST_QUEUE2_SIGNAL_DEL (queue);
2330
2331   return item;
2332
2333   /* ERRORS */
2334 no_item:
2335   {
2336     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2337     return NULL;
2338   }
2339 }
2340
2341 static gboolean
2342 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2343     GstEvent * event)
2344 {
2345   gboolean ret = TRUE;
2346   GstQueue2 *queue;
2347
2348   queue = GST_QUEUE2 (parent);
2349
2350   switch (GST_EVENT_TYPE (event)) {
2351     case GST_EVENT_FLUSH_START:
2352     {
2353       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2354       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2355         /* forward event */
2356         ret = gst_pad_push_event (queue->srcpad, event);
2357
2358         /* now unblock the chain function */
2359         GST_QUEUE2_MUTEX_LOCK (queue);
2360         queue->srcresult = GST_FLOW_FLUSHING;
2361         queue->sinkresult = GST_FLOW_FLUSHING;
2362         /* unblock the loop and chain functions */
2363         GST_QUEUE2_SIGNAL_ADD (queue);
2364         GST_QUEUE2_SIGNAL_DEL (queue);
2365         queue->last_query = FALSE;
2366         g_cond_signal (&queue->query_handled);
2367         GST_QUEUE2_MUTEX_UNLOCK (queue);
2368
2369         /* make sure it pauses, this should happen since we sent
2370          * flush_start downstream. */
2371         gst_pad_pause_task (queue->srcpad);
2372         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2373       } else {
2374         GST_QUEUE2_MUTEX_LOCK (queue);
2375         /* flush the sink pad */
2376         queue->sinkresult = GST_FLOW_FLUSHING;
2377         GST_QUEUE2_SIGNAL_DEL (queue);
2378         queue->last_query = FALSE;
2379         g_cond_signal (&queue->query_handled);
2380         GST_QUEUE2_MUTEX_UNLOCK (queue);
2381
2382         gst_event_unref (event);
2383       }
2384       break;
2385     }
2386     case GST_EVENT_FLUSH_STOP:
2387     {
2388       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2389
2390       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2391         /* forward event */
2392         ret = gst_pad_push_event (queue->srcpad, event);
2393
2394         GST_QUEUE2_MUTEX_LOCK (queue);
2395         gst_queue2_locked_flush (queue, FALSE, TRUE);
2396         queue->srcresult = GST_FLOW_OK;
2397         queue->sinkresult = GST_FLOW_OK;
2398         queue->is_eos = FALSE;
2399         queue->unexpected = FALSE;
2400         queue->seeking = FALSE;
2401         /* reset rate counters */
2402         reset_rate_timer (queue);
2403         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2404             queue->srcpad, NULL);
2405         GST_QUEUE2_MUTEX_UNLOCK (queue);
2406       } else {
2407         GST_QUEUE2_MUTEX_LOCK (queue);
2408         queue->segment_event_received = FALSE;
2409         queue->is_eos = FALSE;
2410         queue->unexpected = FALSE;
2411         queue->sinkresult = GST_FLOW_OK;
2412         queue->seeking = FALSE;
2413         GST_QUEUE2_MUTEX_UNLOCK (queue);
2414
2415         gst_event_unref (event);
2416       }
2417       break;
2418     }
2419     default:
2420       if (GST_EVENT_IS_SERIALIZED (event)) {
2421         /* serialized events go in the queue */
2422         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2423         if (queue->srcresult != GST_FLOW_OK) {
2424           /* Errors in sticky event pushing are no problem and ignored here
2425            * as they will cause more meaningful errors during data flow.
2426            * For EOS events, that are not followed by data flow, we still
2427            * return FALSE here though and report an error.
2428            */
2429           if (!GST_EVENT_IS_STICKY (event)) {
2430             goto out_flow_error;
2431           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2432             if (queue->srcresult == GST_FLOW_NOT_LINKED
2433                 || queue->srcresult < GST_FLOW_EOS) {
2434               GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2435                   (_("Internal data flow error.")),
2436                   ("streaming task paused, reason %s (%d)",
2437                       gst_flow_get_name (queue->srcresult), queue->srcresult));
2438             }
2439             goto out_flow_error;
2440           }
2441         }
2442         /* refuse more events on EOS */
2443         if (queue->is_eos)
2444           goto out_eos;
2445         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2446         GST_QUEUE2_MUTEX_UNLOCK (queue);
2447         gst_queue2_post_buffering (queue);
2448       } else {
2449         /* non-serialized events are passed upstream. */
2450         ret = gst_pad_push_event (queue->srcpad, event);
2451       }
2452       break;
2453   }
2454   return ret;
2455
2456   /* ERRORS */
2457 out_flushing:
2458   {
2459     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2460     GST_QUEUE2_MUTEX_UNLOCK (queue);
2461     gst_event_unref (event);
2462     return FALSE;
2463   }
2464 out_eos:
2465   {
2466     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2467     GST_QUEUE2_MUTEX_UNLOCK (queue);
2468     gst_event_unref (event);
2469     return FALSE;
2470   }
2471 out_flow_error:
2472   {
2473     GST_LOG_OBJECT (queue,
2474         "refusing event, we have a downstream flow error: %s",
2475         gst_flow_get_name (queue->srcresult));
2476     GST_QUEUE2_MUTEX_UNLOCK (queue);
2477     gst_event_unref (event);
2478     return FALSE;
2479   }
2480 }
2481
2482 static gboolean
2483 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2484     GstQuery * query)
2485 {
2486   GstQueue2 *queue;
2487   gboolean res;
2488
2489   queue = GST_QUEUE2 (parent);
2490
2491   switch (GST_QUERY_TYPE (query)) {
2492     default:
2493       if (GST_QUERY_IS_SERIALIZED (query)) {
2494         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2495         /* serialized events go in the queue. We need to be certain that we
2496          * don't cause deadlocks waiting for the query return value. We check if
2497          * the queue is empty (nothing is blocking downstream and the query can
2498          * be pushed for sure) or we are not buffering. If we are buffering,
2499          * the pipeline waits to unblock downstream until our queue fills up
2500          * completely, which can not happen if we block on the query..
2501          * Therefore we only potentially block when we are not buffering. */
2502         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2503         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2504                 || !queue->use_buffering)) {
2505           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2506             gst_queue2_locked_enqueue (queue, query,
2507                 GST_QUEUE2_ITEM_TYPE_QUERY);
2508
2509             STATUS (queue, queue->sinkpad, "wait for QUERY");
2510             g_cond_wait (&queue->query_handled, &queue->qlock);
2511             if (queue->sinkresult != GST_FLOW_OK)
2512               goto out_flushing;
2513             res = queue->last_query;
2514           } else {
2515             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2516             res = FALSE;
2517           }
2518         } else {
2519           GST_DEBUG_OBJECT (queue,
2520               "refusing query, we are not using the queue");
2521           res = FALSE;
2522         }
2523         GST_QUEUE2_MUTEX_UNLOCK (queue);
2524         gst_queue2_post_buffering (queue);
2525       } else {
2526         res = gst_pad_query_default (pad, parent, query);
2527       }
2528       break;
2529   }
2530   return res;
2531
2532   /* ERRORS */
2533 out_flushing:
2534   {
2535     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2536     GST_QUEUE2_MUTEX_UNLOCK (queue);
2537     return FALSE;
2538   }
2539 }
2540
2541 static gboolean
2542 gst_queue2_is_empty (GstQueue2 * queue)
2543 {
2544   /* never empty on EOS */
2545   if (queue->is_eos)
2546     return FALSE;
2547
2548   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2549     return queue->current->writing_pos <= queue->current->max_reading_pos;
2550   } else {
2551     if (queue->queue.length == 0)
2552       return TRUE;
2553   }
2554
2555   return FALSE;
2556 }
2557
2558 static gboolean
2559 gst_queue2_is_filled (GstQueue2 * queue)
2560 {
2561   gboolean res;
2562
2563   /* always filled on EOS */
2564   if (queue->is_eos)
2565     return TRUE;
2566
2567 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2568     (queue->cur_level.format) >= ((alt_max) ? \
2569       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2570
2571   /* if using a ring buffer we're filled if all ring buffer space is used
2572    * _by the current range_ */
2573   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2574     guint64 rb_size = queue->ring_buffer_max_size;
2575     GST_DEBUG_OBJECT (queue,
2576         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2577         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2578     return CHECK_FILLED (bytes, rb_size);
2579   }
2580
2581   /* if using file, we're never filled if we don't have EOS */
2582   if (QUEUE_IS_USING_TEMP_FILE (queue))
2583     return FALSE;
2584
2585   /* we are never filled when we have no buffers at all */
2586   if (queue->cur_level.buffers == 0)
2587     return FALSE;
2588
2589   /* we are filled if one of the current levels exceeds the max */
2590   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2591       || CHECK_FILLED (time, 0);
2592
2593   /* if we need to, use the rate estimate to check against the max time we are
2594    * allowed to queue */
2595   if (queue->use_rate_estimate)
2596     res |= CHECK_FILLED (rate_time, 0);
2597
2598 #undef CHECK_FILLED
2599   return res;
2600 }
2601
2602 static GstFlowReturn
2603 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2604     GstMiniObject * item, GstQueue2ItemType item_type)
2605 {
2606   /* we have to lock the queue since we span threads */
2607   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2608   /* when we received EOS, we refuse more data */
2609   if (queue->is_eos)
2610     goto out_eos;
2611   /* when we received unexpected from downstream, refuse more buffers */
2612   if (queue->unexpected)
2613     goto out_unexpected;
2614
2615   /* while we didn't receive the newsegment, we're seeking and we skip data */
2616   if (queue->seeking)
2617     goto out_seeking;
2618
2619   if (!gst_queue2_wait_free_space (queue))
2620     goto out_flushing;
2621
2622   /* put buffer in queue now */
2623   gst_queue2_locked_enqueue (queue, item, item_type);
2624   GST_QUEUE2_MUTEX_UNLOCK (queue);
2625   gst_queue2_post_buffering (queue);
2626
2627   return GST_FLOW_OK;
2628
2629   /* special conditions */
2630 out_flushing:
2631   {
2632     GstFlowReturn ret = queue->sinkresult;
2633
2634     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2635         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2636     GST_QUEUE2_MUTEX_UNLOCK (queue);
2637     gst_mini_object_unref (item);
2638
2639     return ret;
2640   }
2641 out_eos:
2642   {
2643     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2644     GST_QUEUE2_MUTEX_UNLOCK (queue);
2645     gst_mini_object_unref (item);
2646
2647     return GST_FLOW_EOS;
2648   }
2649 out_seeking:
2650   {
2651     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2652     GST_QUEUE2_MUTEX_UNLOCK (queue);
2653     gst_mini_object_unref (item);
2654
2655     return GST_FLOW_OK;
2656   }
2657 out_unexpected:
2658   {
2659     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2660     GST_QUEUE2_MUTEX_UNLOCK (queue);
2661     gst_mini_object_unref (item);
2662
2663     return GST_FLOW_EOS;
2664   }
2665 }
2666
2667 static GstFlowReturn
2668 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2669 {
2670   GstQueue2 *queue;
2671
2672   queue = GST_QUEUE2 (parent);
2673
2674   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2675       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2676       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2677       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2678       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2679
2680   return gst_queue2_chain_buffer_or_buffer_list (queue,
2681       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2682 }
2683
2684 static GstFlowReturn
2685 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2686     GstBufferList * buffer_list)
2687 {
2688   GstQueue2 *queue;
2689
2690   queue = GST_QUEUE2 (parent);
2691
2692   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2693       "received buffer list %p", buffer_list);
2694
2695   return gst_queue2_chain_buffer_or_buffer_list (queue,
2696       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2697 }
2698
2699 static GstMiniObject *
2700 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2701 {
2702   GstMiniObject *data;
2703
2704   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2705
2706   /* stop pushing buffers, we dequeue all items until we see an item that we
2707    * can push again, which is EOS or SEGMENT. If there is nothing in the
2708    * queue we can push, we set a flag to make the sinkpad refuse more
2709    * buffers with an EOS return value until we receive something
2710    * pushable again or we get flushed. */
2711   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2712     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2713       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2714           "dropping EOS buffer %p", data);
2715       gst_buffer_unref (GST_BUFFER_CAST (data));
2716     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2717       GstEvent *event = GST_EVENT_CAST (data);
2718       GstEventType type = GST_EVENT_TYPE (event);
2719
2720       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2721         /* we found a pushable item in the queue, push it out */
2722         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2723             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2724         return data;
2725       }
2726       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2727           "dropping EOS event %p", event);
2728       gst_event_unref (event);
2729     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2730       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2731           "dropping EOS buffer list %p", data);
2732       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2733     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2734       queue->last_query = FALSE;
2735       g_cond_signal (&queue->query_handled);
2736       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2737     }
2738   }
2739   /* no more items in the queue. Set the unexpected flag so that upstream
2740    * make us refuse any more buffers on the sinkpad. Since we will still
2741    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2742    * task function does not shut down. */
2743   queue->unexpected = TRUE;
2744   return NULL;
2745 }
2746
2747 /* dequeue an item from the queue an push it downstream. This functions returns
2748  * the result of the push. */
2749 static GstFlowReturn
2750 gst_queue2_push_one (GstQueue2 * queue)
2751 {
2752   GstFlowReturn result = queue->srcresult;
2753   GstMiniObject *data;
2754   GstQueue2ItemType item_type;
2755
2756   data = gst_queue2_locked_dequeue (queue, &item_type);
2757   if (data == NULL)
2758     goto no_item;
2759
2760 next:
2761   STATUS (queue, queue->srcpad, "We have something dequeud");
2762   g_atomic_int_set (&queue->downstream_may_block,
2763       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2764       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2765   GST_QUEUE2_MUTEX_UNLOCK (queue);
2766   gst_queue2_post_buffering (queue);
2767
2768   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2769     GstBuffer *buffer;
2770
2771     buffer = GST_BUFFER_CAST (data);
2772
2773     result = gst_pad_push (queue->srcpad, buffer);
2774     g_atomic_int_set (&queue->downstream_may_block, 0);
2775
2776     /* need to check for srcresult here as well */
2777     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2778     if (result == GST_FLOW_EOS) {
2779       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2780       if (data != NULL)
2781         goto next;
2782       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2783        * to the caller so that the task function does not shut down */
2784       result = GST_FLOW_OK;
2785     }
2786   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2787     GstEvent *event = GST_EVENT_CAST (data);
2788     GstEventType type = GST_EVENT_TYPE (event);
2789
2790     gst_pad_push_event (queue->srcpad, event);
2791
2792     /* if we're EOS, return EOS so that the task pauses. */
2793     if (type == GST_EVENT_EOS) {
2794       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2795           "pushed EOS event %p, return EOS", event);
2796       result = GST_FLOW_EOS;
2797     }
2798
2799     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2800   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2801     GstBufferList *buffer_list;
2802
2803     buffer_list = GST_BUFFER_LIST_CAST (data);
2804
2805     result = gst_pad_push_list (queue->srcpad, buffer_list);
2806     g_atomic_int_set (&queue->downstream_may_block, 0);
2807
2808     /* need to check for srcresult here as well */
2809     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2810     if (result == GST_FLOW_EOS) {
2811       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2812       if (data != NULL)
2813         goto next;
2814       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2815        * to the caller so that the task function does not shut down */
2816       result = GST_FLOW_OK;
2817     }
2818   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2819     GstQuery *query = GST_QUERY_CAST (data);
2820
2821     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2822     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2823     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2824     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2825         "did query %p, return %d", query, queue->last_query);
2826     g_cond_signal (&queue->query_handled);
2827     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2828     result = GST_FLOW_OK;
2829   }
2830   return result;
2831
2832   /* ERRORS */
2833 no_item:
2834   {
2835     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2836         "exit because we have no item in the queue");
2837     return GST_FLOW_ERROR;
2838   }
2839 out_flushing:
2840   {
2841     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2842     return GST_FLOW_FLUSHING;
2843   }
2844 }
2845
2846 /* called repeatedly with @pad as the source pad. This function should push out
2847  * data to the peer element. */
2848 static void
2849 gst_queue2_loop (GstPad * pad)
2850 {
2851   GstQueue2 *queue;
2852   GstFlowReturn ret;
2853
2854   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2855
2856   /* have to lock for thread-safety */
2857   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2858
2859   if (gst_queue2_is_empty (queue)) {
2860     gboolean started;
2861
2862     /* pause the timer while we wait. The fact that we are waiting does not mean
2863      * the byterate on the output pad is lower */
2864     if ((started = queue->out_timer_started))
2865       g_timer_stop (queue->out_timer);
2866
2867     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2868         "queue is empty, waiting for new data");
2869     do {
2870       /* Wait for data to be available, we could be unlocked because of a flush. */
2871       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2872     }
2873     while (gst_queue2_is_empty (queue));
2874
2875     /* and continue if we were running before */
2876     if (started)
2877       g_timer_continue (queue->out_timer);
2878   }
2879   ret = gst_queue2_push_one (queue);
2880   queue->srcresult = ret;
2881   queue->sinkresult = ret;
2882   if (ret != GST_FLOW_OK)
2883     goto out_flushing;
2884
2885   GST_QUEUE2_MUTEX_UNLOCK (queue);
2886   gst_queue2_post_buffering (queue);
2887
2888   return;
2889
2890   /* ERRORS */
2891 out_flushing:
2892   {
2893     gboolean eos = queue->is_eos;
2894     GstFlowReturn ret = queue->srcresult;
2895
2896     gst_pad_pause_task (queue->srcpad);
2897     if (ret == GST_FLOW_FLUSHING) {
2898       gst_queue2_locked_flush (queue, FALSE, FALSE);
2899     } else {
2900       GST_QUEUE2_SIGNAL_DEL (queue);
2901       queue->last_query = FALSE;
2902       g_cond_signal (&queue->query_handled);
2903     }
2904     GST_QUEUE2_MUTEX_UNLOCK (queue);
2905     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2906         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2907     /* let app know about us giving up if upstream is not expected to do so */
2908     /* EOS is already taken care of elsewhere */
2909     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
2910       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2911           (_("Internal data flow error.")),
2912           ("streaming task paused, reason %s (%d)",
2913               gst_flow_get_name (ret), ret));
2914       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2915     }
2916     return;
2917   }
2918 }
2919
2920 static gboolean
2921 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2922 {
2923   gboolean res = TRUE;
2924   GstQueue2 *queue = GST_QUEUE2 (parent);
2925
2926 #ifndef GST_DISABLE_GST_DEBUG
2927   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2928       event, GST_EVENT_TYPE_NAME (event));
2929 #endif
2930
2931   switch (GST_EVENT_TYPE (event)) {
2932     case GST_EVENT_FLUSH_START:
2933       if (QUEUE_IS_USING_QUEUE (queue)) {
2934         /* just forward upstream */
2935         res = gst_pad_push_event (queue->sinkpad, event);
2936       } else {
2937         /* now unblock the getrange function */
2938         GST_QUEUE2_MUTEX_LOCK (queue);
2939         GST_DEBUG_OBJECT (queue, "flushing");
2940         queue->srcresult = GST_FLOW_FLUSHING;
2941         GST_QUEUE2_SIGNAL_ADD (queue);
2942         GST_QUEUE2_MUTEX_UNLOCK (queue);
2943
2944         /* when using a temp file, we eat the event */
2945         res = TRUE;
2946         gst_event_unref (event);
2947       }
2948       break;
2949     case GST_EVENT_FLUSH_STOP:
2950       if (QUEUE_IS_USING_QUEUE (queue)) {
2951         /* just forward upstream */
2952         res = gst_pad_push_event (queue->sinkpad, event);
2953       } else {
2954         /* now unblock the getrange function */
2955         GST_QUEUE2_MUTEX_LOCK (queue);
2956         queue->srcresult = GST_FLOW_OK;
2957         GST_QUEUE2_MUTEX_UNLOCK (queue);
2958
2959         /* when using a temp file, we eat the event */
2960         res = TRUE;
2961         gst_event_unref (event);
2962       }
2963       break;
2964     case GST_EVENT_RECONFIGURE:
2965       GST_QUEUE2_MUTEX_LOCK (queue);
2966       /* assume downstream is linked now and try to push again */
2967       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
2968         queue->srcresult = GST_FLOW_OK;
2969         queue->sinkresult = GST_FLOW_OK;
2970         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
2971           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
2972               NULL);
2973         }
2974       }
2975       GST_QUEUE2_MUTEX_UNLOCK (queue);
2976
2977       res = gst_pad_push_event (queue->sinkpad, event);
2978       break;
2979     default:
2980       res = gst_pad_push_event (queue->sinkpad, event);
2981       break;
2982   }
2983
2984   return res;
2985 }
2986
2987 static gboolean
2988 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2989 {
2990   GstQueue2 *queue;
2991
2992   queue = GST_QUEUE2 (parent);
2993
2994   switch (GST_QUERY_TYPE (query)) {
2995     case GST_QUERY_POSITION:
2996     {
2997       gint64 peer_pos;
2998       GstFormat format;
2999
3000       if (!gst_pad_peer_query (queue->sinkpad, query))
3001         goto peer_failed;
3002
3003       /* get peer position */
3004       gst_query_parse_position (query, &format, &peer_pos);
3005
3006       /* FIXME: this code assumes that there's no discont in the queue */
3007       switch (format) {
3008         case GST_FORMAT_BYTES:
3009           peer_pos -= queue->cur_level.bytes;
3010           break;
3011         case GST_FORMAT_TIME:
3012           peer_pos -= queue->cur_level.time;
3013           break;
3014         default:
3015           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3016               "know how to adjust value", gst_format_get_name (format));
3017           return FALSE;
3018       }
3019       /* set updated position */
3020       gst_query_set_position (query, format, peer_pos);
3021       break;
3022     }
3023     case GST_QUERY_DURATION:
3024     {
3025       GST_DEBUG_OBJECT (queue, "doing peer query");
3026
3027       if (!gst_pad_peer_query (queue->sinkpad, query))
3028         goto peer_failed;
3029
3030       GST_DEBUG_OBJECT (queue, "peer query success");
3031       break;
3032     }
3033     case GST_QUERY_BUFFERING:
3034     {
3035       gint percent;
3036       gboolean is_buffering;
3037       GstBufferingMode mode;
3038       gint avg_in, avg_out;
3039       gint64 buffering_left;
3040
3041       GST_DEBUG_OBJECT (queue, "query buffering");
3042
3043       get_buffering_percent (queue, &is_buffering, &percent);
3044       gst_query_set_buffering_percent (query, is_buffering, percent);
3045
3046       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3047           &buffering_left);
3048       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3049           buffering_left);
3050
3051       if (!QUEUE_IS_USING_QUEUE (queue)) {
3052         /* add ranges for download and ringbuffer buffering */
3053         GstFormat format;
3054         gint64 start, stop, range_start, range_stop;
3055         guint64 writing_pos;
3056         gint64 estimated_total;
3057         gint64 duration;
3058         gboolean peer_res, is_eos;
3059         GstQueue2Range *queued_ranges;
3060
3061         /* we need a current download region */
3062         if (queue->current == NULL)
3063           return FALSE;
3064
3065         writing_pos = queue->current->writing_pos;
3066         is_eos = queue->is_eos;
3067
3068         if (is_eos) {
3069           /* we're EOS, we know the duration in bytes now */
3070           peer_res = TRUE;
3071           duration = writing_pos;
3072         } else {
3073           /* get duration of upstream in bytes */
3074           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3075               GST_FORMAT_BYTES, &duration);
3076         }
3077
3078         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3079             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3080
3081         /* calculate remaining and total download time */
3082         if (peer_res && avg_in > 0.0)
3083           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3084         else
3085           estimated_total = -1;
3086
3087         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3088             estimated_total);
3089
3090         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3091
3092         switch (format) {
3093           case GST_FORMAT_PERCENT:
3094             /* we need duration */
3095             if (!peer_res)
3096               goto peer_failed;
3097
3098             start = 0;
3099             /* get our available data relative to the duration */
3100             if (duration != -1)
3101               stop =
3102                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3103                   duration);
3104             else
3105               stop = -1;
3106             break;
3107           case GST_FORMAT_BYTES:
3108             start = 0;
3109             stop = writing_pos;
3110             break;
3111           default:
3112             start = -1;
3113             stop = -1;
3114             break;
3115         }
3116
3117         /* fill out the buffered ranges */
3118         for (queued_ranges = queue->ranges; queued_ranges;
3119             queued_ranges = queued_ranges->next) {
3120           switch (format) {
3121             case GST_FORMAT_PERCENT:
3122               if (duration == -1) {
3123                 range_start = 0;
3124                 range_stop = 0;
3125                 break;
3126               }
3127               range_start =
3128                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3129                   queued_ranges->offset, duration);
3130               range_stop =
3131                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3132                   queued_ranges->writing_pos, duration);
3133               break;
3134             case GST_FORMAT_BYTES:
3135               range_start = queued_ranges->offset;
3136               range_stop = queued_ranges->writing_pos;
3137               break;
3138             default:
3139               range_start = -1;
3140               range_stop = -1;
3141               break;
3142           }
3143           if (range_start == range_stop)
3144             continue;
3145           GST_DEBUG_OBJECT (queue,
3146               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3147               G_GINT64_FORMAT, range_start, range_stop);
3148           gst_query_add_buffering_range (query, range_start, range_stop);
3149         }
3150
3151         gst_query_set_buffering_range (query, format, start, stop,
3152             estimated_total);
3153       }
3154       break;
3155     }
3156     case GST_QUERY_SCHEDULING:
3157     {
3158       gboolean pull_mode;
3159       GstSchedulingFlags flags = 0;
3160
3161       if (!gst_pad_peer_query (queue->sinkpad, query))
3162         goto peer_failed;
3163
3164       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3165
3166       /* we can operate in pull mode when we are using a tempfile */
3167       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3168
3169       if (pull_mode)
3170         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3171       gst_query_set_scheduling (query, flags, 0, -1, 0);
3172       if (pull_mode)
3173         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3174       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3175       break;
3176     }
3177     default:
3178       /* peer handled other queries */
3179       if (!gst_pad_query_default (pad, parent, query))
3180         goto peer_failed;
3181       break;
3182   }
3183
3184   return TRUE;
3185
3186   /* ERRORS */
3187 peer_failed:
3188   {
3189     GST_DEBUG_OBJECT (queue, "failed peer query");
3190     return FALSE;
3191   }
3192 }
3193
3194 static gboolean
3195 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3196 {
3197   GstQueue2 *queue = GST_QUEUE2 (element);
3198
3199   /* simply forward to the srcpad query function */
3200   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3201       query);
3202 }
3203
3204 static void
3205 gst_queue2_update_upstream_size (GstQueue2 * queue)
3206 {
3207   gint64 upstream_size = -1;
3208
3209   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3210           &upstream_size)) {
3211     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3212
3213     /* upstream_size can be negative but queue->upstream_size is unsigned.
3214      * Prevent setting negative values to it (the query can return -1) */
3215     if (upstream_size >= 0)
3216       queue->upstream_size = upstream_size;
3217     else
3218       queue->upstream_size = 0;
3219   }
3220 }
3221
3222 static GstFlowReturn
3223 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3224     guint length, GstBuffer ** buffer)
3225 {
3226   GstQueue2 *queue;
3227   GstFlowReturn ret;
3228
3229   queue = GST_QUEUE2_CAST (parent);
3230
3231   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3232   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3233   offset = (offset == -1) ? queue->current->reading_pos : offset;
3234
3235   GST_DEBUG_OBJECT (queue,
3236       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3237
3238   /* catch any reads beyond the size of the file here to make sure queue2
3239    * doesn't send seek events beyond the size of the file upstream, since
3240    * that would confuse elements such as souphttpsrc and/or http servers.
3241    * Demuxers often just loop until EOS at the end of the file to figure out
3242    * when they've read all the end-headers or index chunks. */
3243   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3244     gst_queue2_update_upstream_size (queue);
3245     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3246       goto out_unexpected;
3247   }
3248
3249   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3250     gst_queue2_update_upstream_size (queue);
3251     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3252       length = queue->upstream_size - offset;
3253       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3254     }
3255   }
3256
3257   /* FIXME - function will block when the range is not yet available */
3258   ret = gst_queue2_create_read (queue, offset, length, buffer);
3259   GST_QUEUE2_MUTEX_UNLOCK (queue);
3260   gst_queue2_post_buffering (queue);
3261
3262   return ret;
3263
3264   /* ERRORS */
3265 out_flushing:
3266   {
3267     ret = queue->srcresult;
3268
3269     GST_DEBUG_OBJECT (queue, "we are flushing");
3270     GST_QUEUE2_MUTEX_UNLOCK (queue);
3271     return ret;
3272   }
3273 out_unexpected:
3274   {
3275     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3276     GST_QUEUE2_MUTEX_UNLOCK (queue);
3277     return GST_FLOW_EOS;
3278   }
3279 }
3280
3281 /* sink currently only operates in push mode */
3282 static gboolean
3283 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3284     GstPadMode mode, gboolean active)
3285 {
3286   gboolean result;
3287   GstQueue2 *queue;
3288
3289   queue = GST_QUEUE2 (parent);
3290
3291   switch (mode) {
3292     case GST_PAD_MODE_PUSH:
3293       if (active) {
3294         GST_QUEUE2_MUTEX_LOCK (queue);
3295         GST_DEBUG_OBJECT (queue, "activating push mode");
3296         queue->srcresult = GST_FLOW_OK;
3297         queue->sinkresult = GST_FLOW_OK;
3298         queue->is_eos = FALSE;
3299         queue->unexpected = FALSE;
3300         reset_rate_timer (queue);
3301         GST_QUEUE2_MUTEX_UNLOCK (queue);
3302       } else {
3303         /* unblock chain function */
3304         GST_QUEUE2_MUTEX_LOCK (queue);
3305         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3306         queue->srcresult = GST_FLOW_FLUSHING;
3307         queue->sinkresult = GST_FLOW_FLUSHING;
3308         GST_QUEUE2_SIGNAL_DEL (queue);
3309         /* Unblock query handler */
3310         queue->last_query = FALSE;
3311         g_cond_signal (&queue->query_handled);
3312         GST_QUEUE2_MUTEX_UNLOCK (queue);
3313
3314         /* wait until it is unblocked and clean up */
3315         GST_PAD_STREAM_LOCK (pad);
3316         GST_QUEUE2_MUTEX_LOCK (queue);
3317         gst_queue2_locked_flush (queue, TRUE, FALSE);
3318         GST_QUEUE2_MUTEX_UNLOCK (queue);
3319         GST_PAD_STREAM_UNLOCK (pad);
3320       }
3321       result = TRUE;
3322       break;
3323     default:
3324       result = FALSE;
3325       break;
3326   }
3327   return result;
3328 }
3329
3330 /* src operating in push mode, we start a task on the source pad that pushes out
3331  * buffers from the queue */
3332 static gboolean
3333 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3334 {
3335   gboolean result = FALSE;
3336   GstQueue2 *queue;
3337
3338   queue = GST_QUEUE2 (parent);
3339
3340   if (active) {
3341     GST_QUEUE2_MUTEX_LOCK (queue);
3342     GST_DEBUG_OBJECT (queue, "activating push mode");
3343     queue->srcresult = GST_FLOW_OK;
3344     queue->sinkresult = GST_FLOW_OK;
3345     queue->is_eos = FALSE;
3346     queue->unexpected = FALSE;
3347     result =
3348         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3349     GST_QUEUE2_MUTEX_UNLOCK (queue);
3350   } else {
3351     /* unblock loop function */
3352     GST_QUEUE2_MUTEX_LOCK (queue);
3353     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3354     queue->srcresult = GST_FLOW_FLUSHING;
3355     queue->sinkresult = GST_FLOW_FLUSHING;
3356     /* the item add signal will unblock */
3357     GST_QUEUE2_SIGNAL_ADD (queue);
3358     GST_QUEUE2_MUTEX_UNLOCK (queue);
3359
3360     /* step 2, make sure streaming finishes */
3361     result = gst_pad_stop_task (pad);
3362   }
3363
3364   return result;
3365 }
3366
3367 /* pull mode, downstream will call our getrange function */
3368 static gboolean
3369 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3370 {
3371   gboolean result;
3372   GstQueue2 *queue;
3373
3374   queue = GST_QUEUE2 (parent);
3375
3376   if (active) {
3377     GST_QUEUE2_MUTEX_LOCK (queue);
3378     if (!QUEUE_IS_USING_QUEUE (queue)) {
3379       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3380         /* open the temp file now */
3381         result = gst_queue2_open_temp_location_file (queue);
3382       } else if (!queue->ring_buffer) {
3383         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3384         result = ! !queue->ring_buffer;
3385       } else {
3386         result = TRUE;
3387       }
3388
3389       GST_DEBUG_OBJECT (queue, "activating pull mode");
3390       init_ranges (queue);
3391       queue->srcresult = GST_FLOW_OK;
3392       queue->sinkresult = GST_FLOW_OK;
3393       queue->is_eos = FALSE;
3394       queue->unexpected = FALSE;
3395       queue->upstream_size = 0;
3396     } else {
3397       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3398       /* this is not allowed, we cannot operate in pull mode without a temp
3399        * file. */
3400       queue->srcresult = GST_FLOW_FLUSHING;
3401       queue->sinkresult = GST_FLOW_FLUSHING;
3402       result = FALSE;
3403     }
3404     GST_QUEUE2_MUTEX_UNLOCK (queue);
3405   } else {
3406     GST_QUEUE2_MUTEX_LOCK (queue);
3407     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3408     queue->srcresult = GST_FLOW_FLUSHING;
3409     queue->sinkresult = GST_FLOW_FLUSHING;
3410     /* this will unlock getrange */
3411     GST_QUEUE2_SIGNAL_ADD (queue);
3412     result = TRUE;
3413     GST_QUEUE2_MUTEX_UNLOCK (queue);
3414   }
3415
3416   return result;
3417 }
3418
3419 static gboolean
3420 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3421     gboolean active)
3422 {
3423   gboolean res;
3424
3425   switch (mode) {
3426     case GST_PAD_MODE_PULL:
3427       res = gst_queue2_src_activate_pull (pad, parent, active);
3428       break;
3429     case GST_PAD_MODE_PUSH:
3430       res = gst_queue2_src_activate_push (pad, parent, active);
3431       break;
3432     default:
3433       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3434       res = FALSE;
3435       break;
3436   }
3437   return res;
3438 }
3439
3440 static GstStateChangeReturn
3441 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3442 {
3443   GstQueue2 *queue;
3444   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3445
3446   queue = GST_QUEUE2 (element);
3447
3448   switch (transition) {
3449     case GST_STATE_CHANGE_NULL_TO_READY:
3450       break;
3451     case GST_STATE_CHANGE_READY_TO_PAUSED:
3452       GST_QUEUE2_MUTEX_LOCK (queue);
3453       if (!QUEUE_IS_USING_QUEUE (queue)) {
3454         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3455           if (!gst_queue2_open_temp_location_file (queue))
3456             ret = GST_STATE_CHANGE_FAILURE;
3457         } else {
3458           if (queue->ring_buffer) {
3459             g_free (queue->ring_buffer);
3460             queue->ring_buffer = NULL;
3461           }
3462           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3463             ret = GST_STATE_CHANGE_FAILURE;
3464         }
3465         init_ranges (queue);
3466       }
3467       queue->segment_event_received = FALSE;
3468       queue->starting_segment = NULL;
3469       gst_event_replace (&queue->stream_start_event, NULL);
3470       GST_QUEUE2_MUTEX_UNLOCK (queue);
3471       break;
3472     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3473       break;
3474     default:
3475       break;
3476   }
3477
3478   if (ret == GST_STATE_CHANGE_FAILURE)
3479     return ret;
3480
3481   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3482
3483   if (ret == GST_STATE_CHANGE_FAILURE)
3484     return ret;
3485
3486   switch (transition) {
3487     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3488       break;
3489     case GST_STATE_CHANGE_PAUSED_TO_READY:
3490       GST_QUEUE2_MUTEX_LOCK (queue);
3491       if (!QUEUE_IS_USING_QUEUE (queue)) {
3492         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3493           gst_queue2_close_temp_location_file (queue);
3494         } else if (queue->ring_buffer) {
3495           g_free (queue->ring_buffer);
3496           queue->ring_buffer = NULL;
3497         }
3498         clean_ranges (queue);
3499       }
3500       if (queue->starting_segment != NULL) {
3501         gst_event_unref (queue->starting_segment);
3502         queue->starting_segment = NULL;
3503       }
3504       gst_event_replace (&queue->stream_start_event, NULL);
3505       GST_QUEUE2_MUTEX_UNLOCK (queue);
3506       break;
3507     case GST_STATE_CHANGE_READY_TO_NULL:
3508       break;
3509     default:
3510       break;
3511   }
3512
3513   return ret;
3514 }
3515
3516 /* changing the capacity of the queue must wake up
3517  * the _chain function, it might have more room now
3518  * to store the buffer/event in the queue */
3519 #define QUEUE_CAPACITY_CHANGE(q) \
3520   GST_QUEUE2_SIGNAL_DEL (queue); \
3521   if (queue->use_buffering)      \
3522     update_buffering (queue);
3523
3524 /* Changing the minimum required fill level must
3525  * wake up the _loop function as it might now
3526  * be able to preceed.
3527  */
3528 #define QUEUE_THRESHOLD_CHANGE(q)\
3529   GST_QUEUE2_SIGNAL_ADD (queue);
3530
3531 static void
3532 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3533 {
3534   GstState state;
3535
3536   /* the element must be stopped in order to do this */
3537   GST_OBJECT_LOCK (queue);
3538   state = GST_STATE (queue);
3539   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3540     goto wrong_state;
3541   GST_OBJECT_UNLOCK (queue);
3542
3543   /* set new location */
3544   g_free (queue->temp_template);
3545   queue->temp_template = g_strdup (template);
3546
3547   return;
3548
3549 /* ERROR */
3550 wrong_state:
3551   {
3552     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3553     GST_OBJECT_UNLOCK (queue);
3554   }
3555 }
3556
3557 static void
3558 gst_queue2_set_property (GObject * object,
3559     guint prop_id, const GValue * value, GParamSpec * pspec)
3560 {
3561   GstQueue2 *queue = GST_QUEUE2 (object);
3562
3563   /* someone could change levels here, and since this
3564    * affects the get/put funcs, we need to lock for safety. */
3565   GST_QUEUE2_MUTEX_LOCK (queue);
3566
3567   switch (prop_id) {
3568     case PROP_MAX_SIZE_BYTES:
3569       queue->max_level.bytes = g_value_get_uint (value);
3570       QUEUE_CAPACITY_CHANGE (queue);
3571       break;
3572     case PROP_MAX_SIZE_BUFFERS:
3573       queue->max_level.buffers = g_value_get_uint (value);
3574       QUEUE_CAPACITY_CHANGE (queue);
3575       break;
3576     case PROP_MAX_SIZE_TIME:
3577       queue->max_level.time = g_value_get_uint64 (value);
3578       /* set rate_time to the same value. We use an extra field in the level
3579        * structure so that we can easily access and compare it */
3580       queue->max_level.rate_time = queue->max_level.time;
3581       QUEUE_CAPACITY_CHANGE (queue);
3582       break;
3583     case PROP_USE_BUFFERING:
3584       queue->use_buffering = g_value_get_boolean (value);
3585       if (!queue->use_buffering && queue->is_buffering) {
3586         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3587             "posting 100%% message");
3588         SET_PERCENT (queue, 100);
3589         queue->is_buffering = FALSE;
3590       }
3591
3592       if (queue->use_buffering) {
3593         queue->is_buffering = TRUE;
3594         update_buffering (queue);
3595       }
3596       break;
3597     case PROP_USE_RATE_ESTIMATE:
3598       queue->use_rate_estimate = g_value_get_boolean (value);
3599       break;
3600     case PROP_LOW_PERCENT:
3601       queue->low_percent = g_value_get_int (value);
3602       break;
3603     case PROP_HIGH_PERCENT:
3604       queue->high_percent = g_value_get_int (value);
3605       break;
3606     case PROP_TEMP_TEMPLATE:
3607       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3608       break;
3609     case PROP_TEMP_REMOVE:
3610       queue->temp_remove = g_value_get_boolean (value);
3611       break;
3612     case PROP_RING_BUFFER_MAX_SIZE:
3613       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3614       break;
3615     default:
3616       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3617       break;
3618   }
3619
3620   GST_QUEUE2_MUTEX_UNLOCK (queue);
3621   gst_queue2_post_buffering (queue);
3622 }
3623
3624 static void
3625 gst_queue2_get_property (GObject * object,
3626     guint prop_id, GValue * value, GParamSpec * pspec)
3627 {
3628   GstQueue2 *queue = GST_QUEUE2 (object);
3629
3630   GST_QUEUE2_MUTEX_LOCK (queue);
3631
3632   switch (prop_id) {
3633     case PROP_CUR_LEVEL_BYTES:
3634       g_value_set_uint (value, queue->cur_level.bytes);
3635       break;
3636     case PROP_CUR_LEVEL_BUFFERS:
3637       g_value_set_uint (value, queue->cur_level.buffers);
3638       break;
3639     case PROP_CUR_LEVEL_TIME:
3640       g_value_set_uint64 (value, queue->cur_level.time);
3641       break;
3642     case PROP_MAX_SIZE_BYTES:
3643       g_value_set_uint (value, queue->max_level.bytes);
3644       break;
3645     case PROP_MAX_SIZE_BUFFERS:
3646       g_value_set_uint (value, queue->max_level.buffers);
3647       break;
3648     case PROP_MAX_SIZE_TIME:
3649       g_value_set_uint64 (value, queue->max_level.time);
3650       break;
3651     case PROP_USE_BUFFERING:
3652       g_value_set_boolean (value, queue->use_buffering);
3653       break;
3654     case PROP_USE_RATE_ESTIMATE:
3655       g_value_set_boolean (value, queue->use_rate_estimate);
3656       break;
3657     case PROP_LOW_PERCENT:
3658       g_value_set_int (value, queue->low_percent);
3659       break;
3660     case PROP_HIGH_PERCENT:
3661       g_value_set_int (value, queue->high_percent);
3662       break;
3663     case PROP_TEMP_TEMPLATE:
3664       g_value_set_string (value, queue->temp_template);
3665       break;
3666     case PROP_TEMP_LOCATION:
3667       g_value_set_string (value, queue->temp_location);
3668       break;
3669     case PROP_TEMP_REMOVE:
3670       g_value_set_boolean (value, queue->temp_remove);
3671       break;
3672     case PROP_RING_BUFFER_MAX_SIZE:
3673       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3674       break;
3675     case PROP_AVG_IN_RATE:
3676     {
3677       gdouble in_rate = queue->byte_in_rate;
3678
3679       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3680        * calculated, so calculate it here. */
3681       if (in_rate == 0.0 && queue->bytes_in
3682           && queue->last_update_in_rates_elapsed > 0.0)
3683         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3684
3685       g_value_set_int64 (value, (gint64) in_rate);
3686       break;
3687     }
3688     default:
3689       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3690       break;
3691   }
3692
3693   GST_QUEUE2_MUTEX_UNLOCK (queue);
3694 }