queue2: enable large file support on Android
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 #ifdef __BIONIC__               /* Android */
77 #undef lseek
78 #define lseek lseek64
79 #undef off_t
80 #define off_t guint64
81 #include <fcntl.h>
82 #endif
83
84 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
85     GST_PAD_SINK,
86     GST_PAD_ALWAYS,
87     GST_STATIC_CAPS_ANY);
88
89 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
90     GST_PAD_SRC,
91     GST_PAD_ALWAYS,
92     GST_STATIC_CAPS_ANY);
93
94 GST_DEBUG_CATEGORY_STATIC (queue_debug);
95 #define GST_CAT_DEFAULT (queue_debug)
96 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
97
98 enum
99 {
100   SIGNAL_OVERRUN,
101   LAST_SIGNAL
102 };
103
104 /* other defines */
105 #define DEFAULT_BUFFER_SIZE 4096
106 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
107 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
108 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
109
110 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
111
112 /* default property values */
113 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
114 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
115 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
116 #define DEFAULT_USE_BUFFERING      FALSE
117 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
118 #define DEFAULT_LOW_PERCENT        10
119 #define DEFAULT_HIGH_PERCENT       99
120 #define DEFAULT_TEMP_REMOVE        TRUE
121 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
122
123 enum
124 {
125   PROP_0,
126   PROP_CUR_LEVEL_BUFFERS,
127   PROP_CUR_LEVEL_BYTES,
128   PROP_CUR_LEVEL_TIME,
129   PROP_MAX_SIZE_BUFFERS,
130   PROP_MAX_SIZE_BYTES,
131   PROP_MAX_SIZE_TIME,
132   PROP_USE_BUFFERING,
133   PROP_USE_RATE_ESTIMATE,
134   PROP_LOW_PERCENT,
135   PROP_HIGH_PERCENT,
136   PROP_TEMP_TEMPLATE,
137   PROP_TEMP_LOCATION,
138   PROP_TEMP_REMOVE,
139   PROP_RING_BUFFER_MAX_SIZE,
140   PROP_AVG_IN_RATE,
141   PROP_LAST
142 };
143
144 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
145   l.buffers = 0;                                        \
146   l.bytes = 0;                                          \
147   l.time = 0;                                           \
148   l.rate_time = 0;                                      \
149 } G_STMT_END
150
151 #define STATUS(queue, pad, msg) \
152   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
153                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
154                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
155                       " ns, %"G_GUINT64_FORMAT" items", \
156                       GST_DEBUG_PAD_NAME (pad), \
157                       queue->cur_level.buffers, \
158                       queue->max_level.buffers, \
159                       queue->cur_level.bytes, \
160                       queue->max_level.bytes, \
161                       queue->cur_level.time, \
162                       queue->max_level.time, \
163                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
164                         queue->current->writing_pos - queue->current->max_reading_pos : \
165                         queue->queue.length))
166
167 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
168   g_mutex_lock (&q->qlock);                                              \
169 } G_STMT_END
170
171 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
172   GST_QUEUE2_MUTEX_LOCK (q);                                            \
173   if (res != GST_FLOW_OK)                                               \
174     goto label;                                                         \
175 } G_STMT_END
176
177 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
178   g_mutex_unlock (&q->qlock);                                            \
179 } G_STMT_END
180
181 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
182   STATUS (queue, q->sinkpad, "wait for DEL");                           \
183   q->waiting_del = TRUE;                                                \
184   g_cond_wait (&q->item_del, &queue->qlock);                              \
185   q->waiting_del = FALSE;                                               \
186   if (res != GST_FLOW_OK) {                                             \
187     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
188     goto label;                                                         \
189   }                                                                     \
190   STATUS (queue, q->sinkpad, "received DEL");                           \
191 } G_STMT_END
192
193 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
194   STATUS (queue, q->srcpad, "wait for ADD");                            \
195   q->waiting_add = TRUE;                                                \
196   g_cond_wait (&q->item_add, &q->qlock);                                  \
197   q->waiting_add = FALSE;                                               \
198   if (res != GST_FLOW_OK) {                                             \
199     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
200     goto label;                                                         \
201   }                                                                     \
202   STATUS (queue, q->srcpad, "received ADD");                            \
203 } G_STMT_END
204
205 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
206   if (q->waiting_del) {                                                 \
207     STATUS (q, q->srcpad, "signal DEL");                                \
208     g_cond_signal (&q->item_del);                                        \
209   }                                                                     \
210 } G_STMT_END
211
212 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
213   if (q->waiting_add) {                                                 \
214     STATUS (q, q->sinkpad, "signal ADD");                               \
215     g_cond_signal (&q->item_add);                                        \
216   }                                                                     \
217 } G_STMT_END
218
219 #define SET_PERCENT(q, perc) G_STMT_START {                              \
220   if (perc != q->buffering_percent) {                                    \
221     q->buffering_percent = perc;                                         \
222     q->percent_changed = TRUE;                                           \
223     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
224     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
225         &q->buffering_left);                                             \
226   }                                                                      \
227 } G_STMT_END
228
229 #define _do_init \
230     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
231     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
232         "dataflow inside the queue element");
233 #define gst_queue2_parent_class parent_class
234 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
235
236 static void gst_queue2_finalize (GObject * object);
237
238 static void gst_queue2_set_property (GObject * object,
239     guint prop_id, const GValue * value, GParamSpec * pspec);
240 static void gst_queue2_get_property (GObject * object,
241     guint prop_id, GValue * value, GParamSpec * pspec);
242
243 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
244     GstBuffer * buffer);
245 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
246     GstBufferList * buffer_list);
247 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
248 static void gst_queue2_loop (GstPad * pad);
249
250 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
251     GstEvent * event);
252 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
253     GstQuery * query);
254
255 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
256     GstEvent * event);
257 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
258     GstQuery * query);
259 static gboolean gst_queue2_handle_query (GstElement * element,
260     GstQuery * query);
261
262 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
263     guint64 offset, guint length, GstBuffer ** buffer);
264
265 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
266     GstPadMode mode, gboolean active);
267 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
268     GstPadMode mode, gboolean active);
269 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
270     GstStateChange transition);
271
272 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
273 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
274
275 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
276 static void update_in_rates (GstQueue2 * queue);
277 static void gst_queue2_post_buffering (GstQueue2 * queue);
278
279 typedef enum
280 {
281   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
282   GST_QUEUE2_ITEM_TYPE_BUFFER,
283   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
284   GST_QUEUE2_ITEM_TYPE_EVENT,
285   GST_QUEUE2_ITEM_TYPE_QUERY
286 } GstQueue2ItemType;
287
288 typedef struct
289 {
290   GstQueue2ItemType type;
291   GstMiniObject *item;
292 } GstQueue2Item;
293
294 static guint gst_queue2_signals[LAST_SIGNAL] = { 0 };
295
296 static void
297 gst_queue2_class_init (GstQueue2Class * klass)
298 {
299   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
300   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
301
302   gobject_class->set_property = gst_queue2_set_property;
303   gobject_class->get_property = gst_queue2_get_property;
304
305   /* signals */
306   /**
307    * GstQueue2::overrun:
308    * @queue: the queue2 instance
309    *
310    * Reports that the buffer became full (overrun).
311    * A buffer is full if the total amount of data inside it (num-buffers, time,
312    * size) is higher than the boundary values which can be set through the
313    * GObject properties.
314    *
315    * Since: 1.8
316    */
317   gst_queue2_signals[SIGNAL_OVERRUN] =
318       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
319       G_STRUCT_OFFSET (GstQueue2Class, overrun), NULL, NULL,
320       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
321
322   /* properties */
323   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
324       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
325           "Current amount of data in the queue (bytes)",
326           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
327   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
328       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
329           "Current number of buffers in the queue",
330           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
331   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
332       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
333           "Current amount of data in the queue (in ns)",
334           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
335
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
338           "Max. amount of data in the queue (bytes, 0=disable)",
339           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
343       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
344           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
345           DEFAULT_MAX_SIZE_BUFFERS,
346           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
347           G_PARAM_STATIC_STRINGS));
348   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
349       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
350           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
351           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
352           G_PARAM_STATIC_STRINGS));
353
354   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
355       g_param_spec_boolean ("use-buffering", "Use buffering",
356           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
357           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
358           G_PARAM_STATIC_STRINGS));
359   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
360       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
361           "Estimate the bitrate of the stream to calculate time level",
362           DEFAULT_USE_RATE_ESTIMATE,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
365       g_param_spec_int ("low-percent", "Low percent",
366           "Low threshold for buffering to start. Only used if use-buffering is True",
367           0, 100, DEFAULT_LOW_PERCENT,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
370       g_param_spec_int ("high-percent", "High percent",
371           "High threshold for buffering to finish. Only used if use-buffering is True",
372           0, 100, DEFAULT_HIGH_PERCENT,
373           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374
375   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
376       g_param_spec_string ("temp-template", "Temporary File Template",
377           "File template to store temporary files in, should contain directory "
378           "and XXXXXX. (NULL == disabled)",
379           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
382       g_param_spec_string ("temp-location", "Temporary File Location",
383           "Location to store temporary files in (Only read this property, "
384           "use temp-template to configure the name template)",
385           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
386
387   /**
388    * GstQueue2:temp-remove
389    *
390    * When temp-template is set, remove the temporary file when going to READY.
391    */
392   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
393       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
394           "Remove the temp-location after use",
395           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
396
397   /**
398    * GstQueue2:ring-buffer-max-size
399    *
400    * The maximum size of the ring buffer in bytes. If set to 0, the ring
401    * buffer is disabled. Default 0.
402    */
403   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
404       g_param_spec_uint64 ("ring-buffer-max-size",
405           "Max. ring buffer size (bytes)",
406           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
407           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
408           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409
410   /**
411    * GstQueue2:avg-in-rate
412    *
413    * The average input data rate.
414    */
415   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
416       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
417           "Average input data rate (bytes/s)",
418           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
419
420   /* set several parent class virtual functions */
421   gobject_class->finalize = gst_queue2_finalize;
422
423   gst_element_class_add_pad_template (gstelement_class,
424       gst_static_pad_template_get (&srctemplate));
425   gst_element_class_add_pad_template (gstelement_class,
426       gst_static_pad_template_get (&sinktemplate));
427
428   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
429       "Generic",
430       "Simple data queue",
431       "Erik Walthinsen <omega@cse.ogi.edu>, "
432       "Wim Taymans <wim.taymans@gmail.com>");
433
434   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
435   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
436 }
437
438 static void
439 gst_queue2_init (GstQueue2 * queue)
440 {
441   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
442
443   gst_pad_set_chain_function (queue->sinkpad,
444       GST_DEBUG_FUNCPTR (gst_queue2_chain));
445   gst_pad_set_chain_list_function (queue->sinkpad,
446       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
447   gst_pad_set_activatemode_function (queue->sinkpad,
448       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
449   gst_pad_set_event_function (queue->sinkpad,
450       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
451   gst_pad_set_query_function (queue->sinkpad,
452       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
453   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
454   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
455
456   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
457
458   gst_pad_set_activatemode_function (queue->srcpad,
459       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
460   gst_pad_set_getrange_function (queue->srcpad,
461       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
462   gst_pad_set_event_function (queue->srcpad,
463       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
464   gst_pad_set_query_function (queue->srcpad,
465       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
466   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
467   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
468
469   /* levels */
470   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
471   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
472   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
473   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
474   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
475   queue->use_buffering = DEFAULT_USE_BUFFERING;
476   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
477   queue->low_percent = DEFAULT_LOW_PERCENT;
478   queue->high_percent = DEFAULT_HIGH_PERCENT;
479
480   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
481   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
482
483   queue->sinktime = GST_CLOCK_TIME_NONE;
484   queue->srctime = GST_CLOCK_TIME_NONE;
485   queue->sink_tainted = TRUE;
486   queue->src_tainted = TRUE;
487
488   queue->srcresult = GST_FLOW_FLUSHING;
489   queue->sinkresult = GST_FLOW_FLUSHING;
490   queue->is_eos = FALSE;
491   queue->in_timer = g_timer_new ();
492   queue->out_timer = g_timer_new ();
493
494   g_mutex_init (&queue->qlock);
495   queue->waiting_add = FALSE;
496   g_cond_init (&queue->item_add);
497   queue->waiting_del = FALSE;
498   g_cond_init (&queue->item_del);
499   g_queue_init (&queue->queue);
500
501   g_cond_init (&queue->query_handled);
502   queue->last_query = FALSE;
503
504   g_mutex_init (&queue->buffering_post_lock);
505   queue->buffering_percent = 100;
506
507   /* tempfile related */
508   queue->temp_template = NULL;
509   queue->temp_location = NULL;
510   queue->temp_remove = DEFAULT_TEMP_REMOVE;
511
512   queue->ring_buffer = NULL;
513   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
514
515   GST_DEBUG_OBJECT (queue,
516       "initialized queue's not_empty & not_full conditions");
517 }
518
519 /* called only once, as opposed to dispose */
520 static void
521 gst_queue2_finalize (GObject * object)
522 {
523   GstQueue2 *queue = GST_QUEUE2 (object);
524
525   GST_DEBUG_OBJECT (queue, "finalizing queue");
526
527   while (!g_queue_is_empty (&queue->queue)) {
528     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
529
530     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
531       gst_mini_object_unref (qitem->item);
532     g_slice_free (GstQueue2Item, qitem);
533   }
534
535   queue->last_query = FALSE;
536   g_queue_clear (&queue->queue);
537   g_mutex_clear (&queue->qlock);
538   g_mutex_clear (&queue->buffering_post_lock);
539   g_cond_clear (&queue->item_add);
540   g_cond_clear (&queue->item_del);
541   g_cond_clear (&queue->query_handled);
542   g_timer_destroy (queue->in_timer);
543   g_timer_destroy (queue->out_timer);
544
545   /* temp_file path cleanup  */
546   g_free (queue->temp_template);
547   g_free (queue->temp_location);
548
549   G_OBJECT_CLASS (parent_class)->finalize (object);
550 }
551
552 static void
553 debug_ranges (GstQueue2 * queue)
554 {
555   GstQueue2Range *walk;
556
557   for (walk = queue->ranges; walk; walk = walk->next) {
558     GST_DEBUG_OBJECT (queue,
559         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
560         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
561         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
562         walk->rb_writing_pos, walk->reading_pos,
563         walk == queue->current ? "**y**" : "  n  ");
564   }
565 }
566
567 /* clear all the downloaded ranges */
568 static void
569 clean_ranges (GstQueue2 * queue)
570 {
571   GST_DEBUG_OBJECT (queue, "clean queue ranges");
572
573   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
574   queue->ranges = NULL;
575   queue->current = NULL;
576 }
577
578 /* find a range that contains @offset or NULL when nothing does */
579 static GstQueue2Range *
580 find_range (GstQueue2 * queue, guint64 offset)
581 {
582   GstQueue2Range *range = NULL;
583   GstQueue2Range *walk;
584
585   /* first do a quick check for the current range */
586   for (walk = queue->ranges; walk; walk = walk->next) {
587     if (offset >= walk->offset && offset <= walk->writing_pos) {
588       /* we can reuse an existing range */
589       range = walk;
590       break;
591     }
592   }
593   if (range) {
594     GST_DEBUG_OBJECT (queue,
595         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
596         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
597   } else {
598     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
599   }
600   return range;
601 }
602
603 static void
604 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
605 {
606   guint64 max_reading_pos, writing_pos;
607
608   writing_pos = range->writing_pos;
609   max_reading_pos = range->max_reading_pos;
610
611   if (writing_pos > max_reading_pos)
612     queue->cur_level.bytes = writing_pos - max_reading_pos;
613   else
614     queue->cur_level.bytes = 0;
615 }
616
617 /* make a new range for @offset or reuse an existing range */
618 static GstQueue2Range *
619 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
620 {
621   GstQueue2Range *range, *prev, *next;
622
623   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
624
625   if ((range = find_range (queue, offset))) {
626     GST_DEBUG_OBJECT (queue,
627         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
628         range->writing_pos);
629     if (update_existing && range->writing_pos != offset) {
630       GST_DEBUG_OBJECT (queue, "updating range writing position to "
631           "%" G_GUINT64_FORMAT, offset);
632       range->writing_pos = offset;
633     }
634   } else {
635     GST_DEBUG_OBJECT (queue,
636         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
637
638     range = g_slice_new0 (GstQueue2Range);
639     range->offset = offset;
640     /* we want to write to the next location in the ring buffer */
641     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
642     range->writing_pos = offset;
643     range->rb_writing_pos = range->rb_offset;
644     range->reading_pos = offset;
645     range->max_reading_pos = offset;
646
647     /* insert sorted */
648     prev = NULL;
649     next = queue->ranges;
650     while (next) {
651       if (next->offset > offset) {
652         /* insert before next */
653         GST_DEBUG_OBJECT (queue,
654             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
655             next->offset);
656         break;
657       }
658       /* try next */
659       prev = next;
660       next = next->next;
661     }
662     range->next = next;
663     if (prev)
664       prev->next = range;
665     else
666       queue->ranges = range;
667   }
668   debug_ranges (queue);
669
670   /* update the stats for this range */
671   update_cur_level (queue, range);
672
673   return range;
674 }
675
676
677 /* clear and init the download ranges for offset 0 */
678 static void
679 init_ranges (GstQueue2 * queue)
680 {
681   GST_DEBUG_OBJECT (queue, "init queue ranges");
682
683   /* get rid of all the current ranges */
684   clean_ranges (queue);
685   /* make a range for offset 0 */
686   queue->current = add_range (queue, 0, TRUE);
687 }
688
689 /* calculate the diff between running time on the sink and src of the queue.
690  * This is the total amount of time in the queue. */
691 static void
692 update_time_level (GstQueue2 * queue)
693 {
694   if (queue->sink_tainted) {
695     queue->sinktime =
696         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
697         queue->sink_segment.position);
698     queue->sink_tainted = FALSE;
699   }
700
701   if (queue->src_tainted) {
702     queue->srctime =
703         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
704         queue->src_segment.position);
705     queue->src_tainted = FALSE;
706   }
707
708   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
709       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
710
711   if (queue->sinktime != GST_CLOCK_TIME_NONE
712       && queue->srctime != GST_CLOCK_TIME_NONE
713       && queue->sinktime >= queue->srctime)
714     queue->cur_level.time = queue->sinktime - queue->srctime;
715   else
716     queue->cur_level.time = 0;
717 }
718
719 /* take a SEGMENT event and apply the values to segment, updating the time
720  * level of queue. */
721 static void
722 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
723     gboolean is_sink)
724 {
725   gst_event_copy_segment (event, segment);
726
727   if (segment->format == GST_FORMAT_BYTES) {
728     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
729       /* start is where we'll be getting from and as such writing next */
730       queue->current = add_range (queue, segment->start, TRUE);
731     }
732   }
733
734   /* now configure the values, we use these to track timestamps on the
735    * sinkpad. */
736   if (segment->format != GST_FORMAT_TIME) {
737     /* non-time format, pretent the current time segment is closed with a
738      * 0 start and unknown stop time. */
739     segment->format = GST_FORMAT_TIME;
740     segment->start = 0;
741     segment->stop = -1;
742     segment->time = 0;
743   }
744
745   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
746
747   if (is_sink)
748     queue->sink_tainted = TRUE;
749   else
750     queue->src_tainted = TRUE;
751
752   /* segment can update the time level of the queue */
753   update_time_level (queue);
754 }
755
756 static void
757 apply_gap (GstQueue2 * queue, GstEvent * event,
758     GstSegment * segment, gboolean is_sink)
759 {
760   GstClockTime timestamp;
761   GstClockTime duration;
762
763   gst_event_parse_gap (event, &timestamp, &duration);
764
765   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
766
767     if (GST_CLOCK_TIME_IS_VALID (duration)) {
768       timestamp += duration;
769     }
770
771     segment->position = timestamp;
772
773     if (is_sink)
774       queue->sink_tainted = TRUE;
775     else
776       queue->src_tainted = TRUE;
777
778     /* calc diff with other end */
779     update_time_level (queue);
780   }
781 }
782
783 /* take a buffer and update segment, updating the time level of the queue. */
784 static void
785 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
786     gboolean is_sink)
787 {
788   GstClockTime duration, timestamp;
789
790   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
791   duration = GST_BUFFER_DURATION (buffer);
792
793   /* if no timestamp is set, assume it's continuous with the previous
794    * time */
795   if (timestamp == GST_CLOCK_TIME_NONE)
796     timestamp = segment->position;
797
798   /* add duration */
799   if (duration != GST_CLOCK_TIME_NONE)
800     timestamp += duration;
801
802   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
803       GST_TIME_ARGS (timestamp));
804
805   segment->position = timestamp;
806
807   if (is_sink)
808     queue->sink_tainted = TRUE;
809   else
810     queue->src_tainted = TRUE;
811
812   /* calc diff with other end */
813   update_time_level (queue);
814 }
815
816 static gboolean
817 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
818 {
819   GstClockTime *timestamp = data;
820   GstClockTime btime;
821
822   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
823       " duration %" GST_TIME_FORMAT, idx,
824       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
825       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
826       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
827
828   btime = GST_BUFFER_DTS_OR_PTS (*buf);
829   if (GST_CLOCK_TIME_IS_VALID (btime))
830     *timestamp = btime;
831
832   if (GST_BUFFER_DURATION_IS_VALID (*buf))
833     *timestamp += GST_BUFFER_DURATION (*buf);
834
835   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
836   return TRUE;
837 }
838
839 /* take a buffer list and update segment, updating the time level of the queue */
840 static void
841 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
842     GstSegment * segment, gboolean is_sink)
843 {
844   GstClockTime timestamp;
845
846   /* if no timestamp is set, assume it's continuous with the previous time */
847   timestamp = segment->position;
848
849   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
850
851   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
852       GST_TIME_ARGS (timestamp));
853
854   segment->position = timestamp;
855
856   if (is_sink)
857     queue->sink_tainted = TRUE;
858   else
859     queue->src_tainted = TRUE;
860
861   /* calc diff with other end */
862   update_time_level (queue);
863 }
864
865 static gboolean
866 get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
867     gint * percent)
868 {
869   gint perc;
870
871   if (queue->high_percent <= 0) {
872     if (percent)
873       *percent = 100;
874     if (is_buffering)
875       *is_buffering = FALSE;
876     return FALSE;
877   }
878 #define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
879
880   if (queue->is_eos) {
881     /* on EOS we are always 100% full, we set the var here so that it we can
882      * reuse the logic below to stop buffering */
883     perc = 100;
884     GST_LOG_OBJECT (queue, "we are EOS");
885   } else {
886     GST_LOG_OBJECT (queue,
887         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
888         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
889         queue->cur_level.buffers);
890
891     /* figure out the percent we are filled, we take the max of all formats. */
892     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
893       perc = GET_PERCENT (bytes, 0);
894     } else {
895       guint64 rb_size = queue->ring_buffer_max_size;
896       perc = GET_PERCENT (bytes, rb_size);
897     }
898     perc = MAX (perc, GET_PERCENT (time, 0));
899     perc = MAX (perc, GET_PERCENT (buffers, 0));
900
901     /* also apply the rate estimate when we need to */
902     if (queue->use_rate_estimate)
903       perc = MAX (perc, GET_PERCENT (rate_time, 0));
904
905     /* Don't get to 0% unless we're really empty */
906     if (queue->cur_level.bytes > 0)
907       perc = MAX (1, perc);
908   }
909 #undef GET_PERCENT
910
911   if (is_buffering)
912     *is_buffering = queue->is_buffering;
913
914   /* scale to high percent so that it becomes the 100% mark */
915   perc = perc * 100 / queue->high_percent;
916   /* clip */
917   if (perc > 100)
918     perc = 100;
919
920   if (percent)
921     *percent = perc;
922
923   GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
924       perc);
925
926   return TRUE;
927 }
928
929 static void
930 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
931     gint * avg_in, gint * avg_out, gint64 * buffering_left)
932 {
933   if (mode) {
934     if (!QUEUE_IS_USING_QUEUE (queue)) {
935       if (QUEUE_IS_USING_RING_BUFFER (queue))
936         *mode = GST_BUFFERING_TIMESHIFT;
937       else
938         *mode = GST_BUFFERING_DOWNLOAD;
939     } else {
940       *mode = GST_BUFFERING_STREAM;
941     }
942   }
943
944   if (avg_in)
945     *avg_in = queue->byte_in_rate;
946   if (avg_out)
947     *avg_out = queue->byte_out_rate;
948
949   if (buffering_left) {
950     *buffering_left = (percent == 100 ? 0 : -1);
951
952     if (queue->use_rate_estimate) {
953       guint64 max, cur;
954
955       max = queue->max_level.rate_time;
956       cur = queue->cur_level.rate_time;
957
958       if (percent != 100 && max > cur)
959         *buffering_left = (max - cur) / 1000000;
960     }
961   }
962 }
963
964 static void
965 gst_queue2_post_buffering (GstQueue2 * queue)
966 {
967   GstMessage *msg = NULL;
968
969   g_mutex_lock (&queue->buffering_post_lock);
970   GST_QUEUE2_MUTEX_LOCK (queue);
971   if (queue->percent_changed) {
972     gint percent = queue->buffering_percent;
973
974     queue->percent_changed = FALSE;
975
976     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
977     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
978
979     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
980         queue->avg_out, queue->buffering_left);
981   }
982   GST_QUEUE2_MUTEX_UNLOCK (queue);
983
984   if (msg != NULL)
985     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
986
987   g_mutex_unlock (&queue->buffering_post_lock);
988 }
989
990 static void
991 update_buffering (GstQueue2 * queue)
992 {
993   gint percent;
994
995   /* Ensure the variables used to calculate buffering state are up-to-date. */
996   if (queue->current)
997     update_cur_level (queue, queue->current);
998   update_in_rates (queue);
999
1000   if (!get_buffering_percent (queue, NULL, &percent))
1001     return;
1002
1003   if (queue->is_buffering) {
1004     /* if we were buffering see if we reached the high watermark */
1005     if (percent >= 100)
1006       queue->is_buffering = FALSE;
1007
1008     SET_PERCENT (queue, percent);
1009   } else {
1010     /* we were not buffering, check if we need to start buffering if we drop
1011      * below the low threshold */
1012     if (percent < queue->low_percent) {
1013       queue->is_buffering = TRUE;
1014       SET_PERCENT (queue, percent);
1015     }
1016   }
1017 }
1018
1019 static void
1020 reset_rate_timer (GstQueue2 * queue)
1021 {
1022   queue->bytes_in = 0;
1023   queue->bytes_out = 0;
1024   queue->byte_in_rate = 0.0;
1025   queue->byte_in_period = 0;
1026   queue->byte_out_rate = 0.0;
1027   queue->last_update_in_rates_elapsed = 0.0;
1028   queue->last_in_elapsed = 0.0;
1029   queue->last_out_elapsed = 0.0;
1030   queue->in_timer_started = FALSE;
1031   queue->out_timer_started = FALSE;
1032 }
1033
1034 /* the interval in seconds to recalculate the rate */
1035 #define RATE_INTERVAL    0.2
1036 /* Tuning for rate estimation. We use a large window for the input rate because
1037  * it should be stable when connected to a network. The output rate is less
1038  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1039  * therefore adapt more quickly.
1040  * However, initial input rate may be subject to a burst, and should therefore
1041  * initially also adapt more quickly to changes, and only later on give higher
1042  * weight to previous values. */
1043 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1044 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1045
1046 static void
1047 update_in_rates (GstQueue2 * queue)
1048 {
1049   gdouble elapsed, period;
1050   gdouble byte_in_rate;
1051
1052   if (!queue->in_timer_started) {
1053     queue->in_timer_started = TRUE;
1054     g_timer_start (queue->in_timer);
1055     return;
1056   }
1057
1058   queue->last_update_in_rates_elapsed = elapsed =
1059       g_timer_elapsed (queue->in_timer, NULL);
1060
1061   /* recalc after each interval. */
1062   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1063     period = elapsed - queue->last_in_elapsed;
1064
1065     GST_DEBUG_OBJECT (queue,
1066         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1067         period, queue->bytes_in, queue->byte_in_period);
1068
1069     byte_in_rate = queue->bytes_in / period;
1070
1071     if (queue->byte_in_rate == 0.0)
1072       queue->byte_in_rate = byte_in_rate;
1073     else
1074       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1075           (double) queue->byte_in_period, period);
1076
1077     /* another data point, cap at 16 for long time running average */
1078     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1079       queue->byte_in_period += period;
1080
1081     /* reset the values to calculate rate over the next interval */
1082     queue->last_in_elapsed = elapsed;
1083     queue->bytes_in = 0;
1084   }
1085
1086   if (queue->byte_in_rate > 0.0) {
1087     queue->cur_level.rate_time =
1088         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1089   }
1090   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1091       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1092 }
1093
1094 static void
1095 update_out_rates (GstQueue2 * queue)
1096 {
1097   gdouble elapsed, period;
1098   gdouble byte_out_rate;
1099
1100   if (!queue->out_timer_started) {
1101     queue->out_timer_started = TRUE;
1102     g_timer_start (queue->out_timer);
1103     return;
1104   }
1105
1106   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1107
1108   /* recalc after each interval. */
1109   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1110     period = elapsed - queue->last_out_elapsed;
1111
1112     GST_DEBUG_OBJECT (queue,
1113         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1114
1115     byte_out_rate = queue->bytes_out / period;
1116
1117     if (queue->byte_out_rate == 0.0)
1118       queue->byte_out_rate = byte_out_rate;
1119     else
1120       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1121
1122     /* reset the values to calculate rate over the next interval */
1123     queue->last_out_elapsed = elapsed;
1124     queue->bytes_out = 0;
1125   }
1126   if (queue->byte_in_rate > 0.0) {
1127     queue->cur_level.rate_time =
1128         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1129   }
1130   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1131       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1132 }
1133
1134 static void
1135 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1136 {
1137   guint64 reading_pos, max_reading_pos;
1138
1139   reading_pos = pos;
1140   max_reading_pos = range->max_reading_pos;
1141
1142   max_reading_pos = MAX (max_reading_pos, reading_pos);
1143
1144   GST_DEBUG_OBJECT (queue,
1145       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1146       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1147   range->max_reading_pos = max_reading_pos;
1148
1149   update_cur_level (queue, range);
1150 }
1151
1152 static gboolean
1153 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1154 {
1155   GstEvent *event;
1156   gboolean res;
1157
1158   /* until we receive the FLUSH_STOP from this seek, we skip data */
1159   queue->seeking = TRUE;
1160   GST_QUEUE2_MUTEX_UNLOCK (queue);
1161
1162   debug_ranges (queue);
1163
1164   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1165
1166   event =
1167       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1168       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1169       GST_SEEK_TYPE_NONE, -1);
1170
1171   res = gst_pad_push_event (queue->sinkpad, event);
1172   GST_QUEUE2_MUTEX_LOCK (queue);
1173
1174   if (res) {
1175     /* Between us sending the seek event and re-acquiring the lock, the source
1176      * thread might already have pushed data and moved along the range's
1177      * writing_pos beyond the seek offset. In that case we don't want to set
1178      * the writing position back to the requested seek position, as it would
1179      * cause data to be written to the wrong offset in the file or ring buffer.
1180      * We still do the add_range call to switch the current range to the
1181      * requested range, or create one if one doesn't exist yet. */
1182     queue->current = add_range (queue, offset, FALSE);
1183   }
1184
1185   return res;
1186 }
1187
1188 /* get the threshold for when we decide to seek rather than wait */
1189 static guint64
1190 get_seek_threshold (GstQueue2 * queue)
1191 {
1192   guint64 threshold;
1193
1194   /* FIXME, find a good threshold based on the incoming rate. */
1195   threshold = 1024 * 512;
1196
1197   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1198     threshold = MIN (threshold,
1199         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1200   }
1201   return threshold;
1202 }
1203
1204 /* see if there is enough data in the file to read a full buffer */
1205 static gboolean
1206 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1207 {
1208   GstQueue2Range *range;
1209
1210   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1211       offset, length);
1212
1213   if ((range = find_range (queue, offset))) {
1214     if (queue->current != range) {
1215       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1216       perform_seek_to_offset (queue, range->writing_pos);
1217     }
1218
1219     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1220         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1221
1222     /* we have a range for offset */
1223     GST_DEBUG_OBJECT (queue,
1224         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1225         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1226
1227     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1228       return TRUE;
1229
1230     if (offset + length <= range->writing_pos)
1231       return TRUE;
1232     else
1233       GST_DEBUG_OBJECT (queue,
1234           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1235           (offset + length) - range->writing_pos);
1236
1237   } else {
1238     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1239         " len %u", offset, length);
1240     /* we don't have the range, see how far away we are */
1241     if (!queue->is_eos && queue->current) {
1242       guint64 threshold = get_seek_threshold (queue);
1243
1244       if (offset >= queue->current->offset && offset <=
1245           queue->current->writing_pos + threshold) {
1246         GST_INFO_OBJECT (queue,
1247             "requested data is within range, wait for data");
1248         return FALSE;
1249       }
1250     }
1251
1252     /* too far away, do a seek */
1253     perform_seek_to_offset (queue, offset);
1254   }
1255
1256   return FALSE;
1257 }
1258
1259 #ifdef HAVE_FSEEKO
1260 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1261 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1262 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1263 #else
1264 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1265 #endif
1266
1267 static GstFlowReturn
1268 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1269     guint8 * dst, gint64 * read_return)
1270 {
1271   guint8 *ring_buffer;
1272   size_t res;
1273
1274   ring_buffer = queue->ring_buffer;
1275
1276   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1277     goto seek_failed;
1278
1279   /* this should not block */
1280   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1281       length, offset);
1282   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1283     res = fread (dst, 1, length, queue->temp_file);
1284   } else {
1285     memcpy (dst, ring_buffer + offset, length);
1286     res = length;
1287   }
1288
1289   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1290
1291   if (G_UNLIKELY (res < length)) {
1292     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1293       goto could_not_read;
1294     /* check for errors or EOF */
1295     if (ferror (queue->temp_file))
1296       goto could_not_read;
1297     if (feof (queue->temp_file) && length > 0)
1298       goto eos;
1299   }
1300
1301   *read_return = res;
1302
1303   return GST_FLOW_OK;
1304
1305 seek_failed:
1306   {
1307     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1308     return GST_FLOW_ERROR;
1309   }
1310 could_not_read:
1311   {
1312     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1313     return GST_FLOW_ERROR;
1314   }
1315 eos:
1316   {
1317     GST_DEBUG ("non-regular file hits EOS");
1318     return GST_FLOW_EOS;
1319   }
1320 }
1321
1322 static GstFlowReturn
1323 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1324     GstBuffer ** buffer)
1325 {
1326   GstBuffer *buf;
1327   GstMapInfo info;
1328   guint8 *data;
1329   guint64 file_offset;
1330   guint block_length, remaining, read_length;
1331   guint64 rb_size;
1332   guint64 max_size;
1333   guint64 rpos;
1334   GstFlowReturn ret = GST_FLOW_OK;
1335
1336   /* allocate the output buffer of the requested size */
1337   if (*buffer == NULL)
1338     buf = gst_buffer_new_allocate (NULL, length, NULL);
1339   else
1340     buf = *buffer;
1341
1342   gst_buffer_map (buf, &info, GST_MAP_WRITE);
1343   data = info.data;
1344
1345   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1346       offset);
1347
1348   rpos = offset;
1349   rb_size = queue->ring_buffer_max_size;
1350   max_size = QUEUE_MAX_BYTES (queue);
1351
1352   remaining = length;
1353   while (remaining > 0) {
1354     /* configure how much/whether to read */
1355     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1356       read_length = 0;
1357
1358       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1359         guint64 level;
1360
1361         /* calculate how far away the offset is */
1362         if (queue->current->writing_pos > rpos)
1363           level = queue->current->writing_pos - rpos;
1364         else
1365           level = 0;
1366
1367         GST_DEBUG_OBJECT (queue,
1368             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1369             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1370             rpos, queue->current->writing_pos, level, max_size);
1371
1372         if (level >= max_size) {
1373           /* we don't have the data but if we have a ring buffer that is full, we
1374            * need to read */
1375           GST_DEBUG_OBJECT (queue,
1376               "ring buffer full, reading QUEUE_MAX_BYTES %"
1377               G_GUINT64_FORMAT " bytes", max_size);
1378           read_length = max_size;
1379         } else if (queue->is_eos) {
1380           /* won't get any more data so read any data we have */
1381           if (level) {
1382             GST_DEBUG_OBJECT (queue,
1383                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1384                 level);
1385             read_length = level;
1386             remaining = level;
1387             length = level;
1388           } else
1389             goto hit_eos;
1390         }
1391       }
1392
1393       if (read_length == 0) {
1394         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1395           GST_DEBUG_OBJECT (queue,
1396               "update current position [%" G_GUINT64_FORMAT "-%"
1397               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1398           update_cur_pos (queue, queue->current, rpos);
1399           GST_QUEUE2_SIGNAL_DEL (queue);
1400         }
1401
1402         if (queue->use_buffering)
1403           update_buffering (queue);
1404
1405         GST_DEBUG_OBJECT (queue, "waiting for add");
1406         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1407         continue;
1408       }
1409     } else {
1410       /* we have the requested data so read it */
1411       read_length = remaining;
1412     }
1413
1414     /* set range reading_pos to actual reading position for this read */
1415     queue->current->reading_pos = rpos;
1416
1417     /* configure how much and from where to read */
1418     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1419       file_offset =
1420           (queue->current->rb_offset + (rpos -
1421               queue->current->offset)) % rb_size;
1422       if (file_offset + read_length > rb_size) {
1423         block_length = rb_size - file_offset;
1424       } else {
1425         block_length = read_length;
1426       }
1427     } else {
1428       file_offset = rpos;
1429       block_length = read_length;
1430     }
1431
1432     /* while we still have data to read, we loop */
1433     while (read_length > 0) {
1434       gint64 read_return;
1435
1436       ret =
1437           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1438           data, &read_return);
1439       if (ret != GST_FLOW_OK)
1440         goto read_error;
1441
1442       file_offset += read_return;
1443       if (QUEUE_IS_USING_RING_BUFFER (queue))
1444         file_offset %= rb_size;
1445
1446       data += read_return;
1447       read_length -= read_return;
1448       block_length = read_length;
1449       remaining -= read_return;
1450
1451       rpos = (queue->current->reading_pos += read_return);
1452       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1453     }
1454     GST_QUEUE2_SIGNAL_DEL (queue);
1455     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1456   }
1457
1458   gst_buffer_unmap (buf, &info);
1459   gst_buffer_resize (buf, 0, length);
1460
1461   GST_BUFFER_OFFSET (buf) = offset;
1462   GST_BUFFER_OFFSET_END (buf) = offset + length;
1463
1464   *buffer = buf;
1465
1466   return ret;
1467
1468   /* ERRORS */
1469 hit_eos:
1470   {
1471     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1472     gst_buffer_unmap (buf, &info);
1473     if (*buffer == NULL)
1474       gst_buffer_unref (buf);
1475     return GST_FLOW_EOS;
1476   }
1477 out_flushing:
1478   {
1479     GST_DEBUG_OBJECT (queue, "we are flushing");
1480     gst_buffer_unmap (buf, &info);
1481     if (*buffer == NULL)
1482       gst_buffer_unref (buf);
1483     return GST_FLOW_FLUSHING;
1484   }
1485 read_error:
1486   {
1487     GST_DEBUG_OBJECT (queue, "we have a read error");
1488     gst_buffer_unmap (buf, &info);
1489     if (*buffer == NULL)
1490       gst_buffer_unref (buf);
1491     return ret;
1492   }
1493 }
1494
1495 /* should be called with QUEUE_LOCK */
1496 static GstMiniObject *
1497 gst_queue2_read_item_from_file (GstQueue2 * queue)
1498 {
1499   GstMiniObject *item;
1500
1501   if (queue->stream_start_event != NULL) {
1502     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1503     queue->stream_start_event = NULL;
1504   } else if (queue->starting_segment != NULL) {
1505     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1506     queue->starting_segment = NULL;
1507   } else {
1508     GstFlowReturn ret;
1509     GstBuffer *buffer = NULL;
1510     guint64 reading_pos;
1511
1512     reading_pos = queue->current->reading_pos;
1513
1514     ret =
1515         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1516         &buffer);
1517
1518     switch (ret) {
1519       case GST_FLOW_OK:
1520         item = GST_MINI_OBJECT_CAST (buffer);
1521         break;
1522       case GST_FLOW_EOS:
1523         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1524         break;
1525       default:
1526         item = NULL;
1527         break;
1528     }
1529   }
1530   return item;
1531 }
1532
1533 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1534  * the temp filename. */
1535 static gboolean
1536 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1537 {
1538   gint fd = -1;
1539   gchar *name = NULL;
1540
1541   if (queue->temp_file)
1542     goto already_opened;
1543
1544   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1545
1546   /* If temp_template was set, allocate a filename and open that filen */
1547
1548   /* nothing to do */
1549   if (queue->temp_template == NULL)
1550     goto no_directory;
1551
1552   /* make copy of the template, we don't want to change this */
1553   name = g_strdup (queue->temp_template);
1554
1555 #ifdef __BIONIC__
1556   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1557 #else
1558   fd = g_mkstemp (name);
1559 #endif
1560
1561   if (fd == -1)
1562     goto mkstemp_failed;
1563
1564   /* open the file for update/writing */
1565   queue->temp_file = fdopen (fd, "wb+");
1566   /* error creating file */
1567   if (queue->temp_file == NULL)
1568     goto open_failed;
1569
1570   g_free (queue->temp_location);
1571   queue->temp_location = name;
1572
1573   GST_QUEUE2_MUTEX_UNLOCK (queue);
1574
1575   /* we can't emit the notify with the lock */
1576   g_object_notify (G_OBJECT (queue), "temp-location");
1577
1578   GST_QUEUE2_MUTEX_LOCK (queue);
1579
1580   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1581
1582   return TRUE;
1583
1584   /* ERRORS */
1585 already_opened:
1586   {
1587     GST_DEBUG_OBJECT (queue, "temp file was already open");
1588     return TRUE;
1589   }
1590 no_directory:
1591   {
1592     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1593         (_("No Temp directory specified.")), (NULL));
1594     return FALSE;
1595   }
1596 mkstemp_failed:
1597   {
1598     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1599         (_("Could not create temp file \"%s\"."), queue->temp_template),
1600         GST_ERROR_SYSTEM);
1601     g_free (name);
1602     return FALSE;
1603   }
1604 open_failed:
1605   {
1606     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1607         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1608     g_free (name);
1609     if (fd != -1)
1610       close (fd);
1611     return FALSE;
1612   }
1613 }
1614
1615 static void
1616 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1617 {
1618   /* nothing to do */
1619   if (queue->temp_file == NULL)
1620     return;
1621
1622   GST_DEBUG_OBJECT (queue, "closing temp file");
1623
1624   fflush (queue->temp_file);
1625   fclose (queue->temp_file);
1626
1627   if (queue->temp_remove) {
1628     if (remove (queue->temp_location) < 0) {
1629       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1630           queue->temp_location, g_strerror (errno));
1631     }
1632   }
1633
1634   queue->temp_file = NULL;
1635   clean_ranges (queue);
1636 }
1637
1638 static void
1639 gst_queue2_flush_temp_file (GstQueue2 * queue)
1640 {
1641   if (queue->temp_file == NULL)
1642     return;
1643
1644   GST_DEBUG_OBJECT (queue, "flushing temp file");
1645
1646   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1647 }
1648
1649 static void
1650 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1651 {
1652   if (!QUEUE_IS_USING_QUEUE (queue)) {
1653     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1654       gst_queue2_flush_temp_file (queue);
1655     init_ranges (queue);
1656   } else {
1657     while (!g_queue_is_empty (&queue->queue)) {
1658       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1659
1660       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1661           && GST_EVENT_IS_STICKY (qitem->item)
1662           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1663           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1664         gst_pad_store_sticky_event (queue->srcpad,
1665             GST_EVENT_CAST (qitem->item));
1666       }
1667
1668       /* Then lose another reference because we are supposed to destroy that
1669          data when flushing */
1670       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1671         gst_mini_object_unref (qitem->item);
1672       g_slice_free (GstQueue2Item, qitem);
1673     }
1674   }
1675   queue->last_query = FALSE;
1676   g_cond_signal (&queue->query_handled);
1677   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1678   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1679   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1680   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1681   queue->sink_tainted = queue->src_tainted = TRUE;
1682   if (queue->starting_segment != NULL)
1683     gst_event_unref (queue->starting_segment);
1684   queue->starting_segment = NULL;
1685   queue->segment_event_received = FALSE;
1686   gst_event_replace (&queue->stream_start_event, NULL);
1687
1688   /* we deleted a lot of something */
1689   GST_QUEUE2_SIGNAL_DEL (queue);
1690 }
1691
1692 static gboolean
1693 gst_queue2_wait_free_space (GstQueue2 * queue)
1694 {
1695   /* We make space available if we're "full" according to whatever
1696    * the user defined as "full". */
1697   if (gst_queue2_is_filled (queue)) {
1698     gboolean started;
1699
1700     /* pause the timer while we wait. The fact that we are waiting does not mean
1701      * the byterate on the input pad is lower */
1702     if ((started = queue->in_timer_started))
1703       g_timer_stop (queue->in_timer);
1704
1705     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1706         "queue is full, waiting for free space");
1707     do {
1708       GST_QUEUE2_MUTEX_UNLOCK (queue);
1709       g_signal_emit (queue, gst_queue2_signals[SIGNAL_OVERRUN], 0);
1710       GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
1711       /* we recheck, the signal could have changed the thresholds */
1712       if (!gst_queue2_is_filled (queue))
1713         break;
1714
1715       /* Wait for space to be available, we could be unlocked because of a flush. */
1716       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1717     }
1718     while (gst_queue2_is_filled (queue));
1719
1720     /* and continue if we were running before */
1721     if (started)
1722       g_timer_continue (queue->in_timer);
1723   }
1724   return TRUE;
1725
1726   /* ERRORS */
1727 out_flushing:
1728   {
1729     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1730     return FALSE;
1731   }
1732 }
1733
1734 static gboolean
1735 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1736 {
1737   GstMapInfo info;
1738   guint8 *data, *ring_buffer;
1739   guint size, rb_size;
1740   guint64 writing_pos, new_writing_pos;
1741   GstQueue2Range *range, *prev, *next;
1742   gboolean do_seek = FALSE;
1743
1744   if (QUEUE_IS_USING_RING_BUFFER (queue))
1745     writing_pos = queue->current->rb_writing_pos;
1746   else
1747     writing_pos = queue->current->writing_pos;
1748   ring_buffer = queue->ring_buffer;
1749   rb_size = queue->ring_buffer_max_size;
1750
1751   gst_buffer_map (buffer, &info, GST_MAP_READ);
1752
1753   size = info.size;
1754   data = info.data;
1755
1756   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1757       writing_pos);
1758
1759   /* sanity check */
1760   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1761       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1762     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1763         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1764         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1765   }
1766
1767   while (size > 0) {
1768     guint to_write;
1769
1770     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1771       gint64 space;
1772
1773       /* calculate the space in the ring buffer not used by data from
1774        * the current range */
1775       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1776         /* wait until there is some free space */
1777         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1778       }
1779       /* get the amount of space we have */
1780       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1781
1782       /* calculate if we need to split or if we can write the entire
1783        * buffer now */
1784       to_write = MIN (size, space);
1785
1786       /* the writing position in the ring buffer after writing (part
1787        * or all of) the buffer */
1788       new_writing_pos = (writing_pos + to_write) % rb_size;
1789
1790       prev = NULL;
1791       range = queue->ranges;
1792
1793       /* if we need to overwrite data in the ring buffer, we need to
1794        * update the ranges
1795        *
1796        * warning: this code is complicated and includes some
1797        * simplifications - pen, paper and diagrams for the cases
1798        * recommended! */
1799       while (range) {
1800         guint64 range_data_start, range_data_end;
1801         GstQueue2Range *range_to_destroy = NULL;
1802
1803         range_data_start = range->rb_offset;
1804         range_data_end = range->rb_writing_pos;
1805
1806         /* handle the special case where the range has no data in it */
1807         if (range->writing_pos == range->offset) {
1808           if (range != queue->current) {
1809             GST_DEBUG_OBJECT (queue,
1810                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1811                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1812             /* remove range */
1813             range_to_destroy = range;
1814             if (prev)
1815               prev->next = range->next;
1816           }
1817           goto next_range;
1818         }
1819
1820         if (range_data_end > range_data_start) {
1821           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1822             goto next_range;
1823
1824           if (new_writing_pos > range_data_start) {
1825             if (new_writing_pos >= range_data_end) {
1826               GST_DEBUG_OBJECT (queue,
1827                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1828                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1829               /* remove range */
1830               range_to_destroy = range;
1831               if (prev)
1832                 prev->next = range->next;
1833             } else {
1834               GST_DEBUG_OBJECT (queue,
1835                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1836                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1837                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1838                   range->offset + new_writing_pos - range_data_start,
1839                   new_writing_pos);
1840               range->offset += (new_writing_pos - range_data_start);
1841               range->rb_offset = new_writing_pos;
1842             }
1843           }
1844         } else {
1845           guint64 new_wpos_virt = writing_pos + to_write;
1846
1847           if (new_wpos_virt <= range_data_start)
1848             goto next_range;
1849
1850           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1851             GST_DEBUG_OBJECT (queue,
1852                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1853                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1854             /* remove range */
1855             range_to_destroy = range;
1856             if (prev)
1857               prev->next = range->next;
1858           } else {
1859             GST_DEBUG_OBJECT (queue,
1860                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1861                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1862                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1863                 range->offset + new_writing_pos - range_data_start,
1864                 new_writing_pos);
1865             range->offset += (new_wpos_virt - range_data_start);
1866             range->rb_offset = new_writing_pos;
1867           }
1868         }
1869
1870       next_range:
1871         if (!range_to_destroy)
1872           prev = range;
1873
1874         range = range->next;
1875         if (range_to_destroy) {
1876           if (range_to_destroy == queue->ranges)
1877             queue->ranges = range;
1878           g_slice_free (GstQueue2Range, range_to_destroy);
1879           range_to_destroy = NULL;
1880         }
1881       }
1882     } else {
1883       to_write = size;
1884       new_writing_pos = writing_pos + to_write;
1885     }
1886
1887     if (QUEUE_IS_USING_TEMP_FILE (queue)
1888         && FSEEK_FILE (queue->temp_file, writing_pos))
1889       goto seek_failed;
1890
1891     if (new_writing_pos > writing_pos) {
1892       GST_INFO_OBJECT (queue,
1893           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1894           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1895           queue->current->writing_pos, queue->current->rb_writing_pos);
1896       /* either not using ring buffer or no wrapping, just write */
1897       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1898         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1899           goto handle_error;
1900       } else {
1901         memcpy (ring_buffer + writing_pos, data, to_write);
1902       }
1903
1904       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1905         /* try to merge with next range */
1906         while ((next = queue->current->next)) {
1907           GST_INFO_OBJECT (queue,
1908               "checking merge with next range %" G_GUINT64_FORMAT " < %"
1909               G_GUINT64_FORMAT, new_writing_pos, next->offset);
1910           if (new_writing_pos < next->offset)
1911             break;
1912
1913           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1914               next->writing_pos);
1915
1916           /* remove the group */
1917           queue->current->next = next->next;
1918
1919           /* We use the threshold to decide if we want to do a seek or simply
1920            * read the data again. If there is not so much data in the range we
1921            * prefer to avoid to seek and read it again. */
1922           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
1923             /* the new range had more data than the threshold, it's worth keeping
1924              * it and doing a seek. */
1925             new_writing_pos = next->writing_pos;
1926             do_seek = TRUE;
1927           }
1928           g_slice_free (GstQueue2Range, next);
1929         }
1930         goto update_and_signal;
1931       }
1932     } else {
1933       /* wrapping */
1934       guint block_one, block_two;
1935
1936       block_one = rb_size - writing_pos;
1937       block_two = to_write - block_one;
1938
1939       if (block_one > 0) {
1940         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
1941         /* write data to end of ring buffer */
1942         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1943           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1944             goto handle_error;
1945         } else {
1946           memcpy (ring_buffer + writing_pos, data, block_one);
1947         }
1948       }
1949
1950       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
1951         goto seek_failed;
1952
1953       if (block_two > 0) {
1954         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
1955         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1956           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
1957             goto handle_error;
1958         } else {
1959           memcpy (ring_buffer, data + block_one, block_two);
1960         }
1961       }
1962     }
1963
1964   update_and_signal:
1965     /* update the writing positions */
1966     size -= to_write;
1967     GST_INFO_OBJECT (queue,
1968         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
1969         to_write, writing_pos, size);
1970
1971     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1972       data += to_write;
1973       queue->current->writing_pos += to_write;
1974       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1975     } else {
1976       queue->current->writing_pos = writing_pos = new_writing_pos;
1977     }
1978     if (do_seek)
1979       perform_seek_to_offset (queue, new_writing_pos);
1980
1981     update_cur_level (queue, queue->current);
1982
1983     /* update the buffering status */
1984     if (queue->use_buffering)
1985       update_buffering (queue);
1986
1987     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1988         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1989
1990     GST_QUEUE2_SIGNAL_ADD (queue);
1991   }
1992
1993   gst_buffer_unmap (buffer, &info);
1994
1995   return TRUE;
1996
1997   /* ERRORS */
1998 out_flushing:
1999   {
2000     GST_DEBUG_OBJECT (queue, "we are flushing");
2001     gst_buffer_unmap (buffer, &info);
2002     /* FIXME - GST_FLOW_EOS ? */
2003     return FALSE;
2004   }
2005 seek_failed:
2006   {
2007     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2008     gst_buffer_unmap (buffer, &info);
2009     return FALSE;
2010   }
2011 handle_error:
2012   {
2013     switch (errno) {
2014       case ENOSPC:{
2015         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2016         break;
2017       }
2018       default:{
2019         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2020             (_("Error while writing to download file.")),
2021             ("%s", g_strerror (errno)));
2022       }
2023     }
2024     gst_buffer_unmap (buffer, &info);
2025     return FALSE;
2026   }
2027 }
2028
2029 static gboolean
2030 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2031 {
2032   GstQueue2 *queue = q;
2033
2034   GST_TRACE_OBJECT (queue,
2035       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2036       gst_buffer_get_size (*buf));
2037
2038   if (!gst_queue2_create_write (queue, *buf)) {
2039     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2040     return FALSE;
2041   }
2042   return TRUE;
2043 }
2044
2045 static gboolean
2046 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2047 {
2048   guint *p_size = data;
2049   gsize buf_size;
2050
2051   buf_size = gst_buffer_get_size (*buf);
2052   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2053   *p_size += buf_size;
2054   return TRUE;
2055 }
2056
2057 /* enqueue an item an update the level stats */
2058 static void
2059 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2060     GstQueue2ItemType item_type)
2061 {
2062   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2063     GstBuffer *buffer;
2064     guint size;
2065
2066     buffer = GST_BUFFER_CAST (item);
2067     size = gst_buffer_get_size (buffer);
2068
2069     /* add buffer to the statistics */
2070     if (QUEUE_IS_USING_QUEUE (queue)) {
2071       queue->cur_level.buffers++;
2072       queue->cur_level.bytes += size;
2073     }
2074     queue->bytes_in += size;
2075
2076     /* apply new buffer to segment stats */
2077     apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
2078     /* update the byterate stats */
2079     update_in_rates (queue);
2080
2081     if (!QUEUE_IS_USING_QUEUE (queue)) {
2082       /* FIXME - check return value? */
2083       gst_queue2_create_write (queue, buffer);
2084     }
2085   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2086     GstBufferList *buffer_list;
2087     guint size = 0;
2088
2089     buffer_list = GST_BUFFER_LIST_CAST (item);
2090
2091     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2092     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2093
2094     /* add buffer to the statistics */
2095     if (QUEUE_IS_USING_QUEUE (queue)) {
2096       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2097       queue->cur_level.bytes += size;
2098     }
2099     queue->bytes_in += size;
2100
2101     /* apply new buffer to segment stats */
2102     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2103
2104     /* update the byterate stats */
2105     update_in_rates (queue);
2106
2107     if (!QUEUE_IS_USING_QUEUE (queue)) {
2108       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2109     }
2110   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2111     GstEvent *event;
2112
2113     event = GST_EVENT_CAST (item);
2114
2115     switch (GST_EVENT_TYPE (event)) {
2116       case GST_EVENT_EOS:
2117         /* Zero the thresholds, this makes sure the queue is completely
2118          * filled and we can read all data from the queue. */
2119         GST_DEBUG_OBJECT (queue, "we have EOS");
2120         queue->is_eos = TRUE;
2121         break;
2122       case GST_EVENT_SEGMENT:
2123         apply_segment (queue, event, &queue->sink_segment, TRUE);
2124         /* This is our first new segment, we hold it
2125          * as we can't save it on the temp file */
2126         if (!QUEUE_IS_USING_QUEUE (queue)) {
2127           if (queue->segment_event_received)
2128             goto unexpected_event;
2129
2130           queue->segment_event_received = TRUE;
2131           if (queue->starting_segment != NULL)
2132             gst_event_unref (queue->starting_segment);
2133           queue->starting_segment = event;
2134           item = NULL;
2135         }
2136         /* a new segment allows us to accept more buffers if we got EOS
2137          * from downstream */
2138         queue->unexpected = FALSE;
2139         break;
2140       case GST_EVENT_GAP:
2141         apply_gap (queue, event, &queue->sink_segment, TRUE);
2142         break;
2143       case GST_EVENT_STREAM_START:
2144         if (!QUEUE_IS_USING_QUEUE (queue)) {
2145           gst_event_replace (&queue->stream_start_event, event);
2146           gst_event_unref (event);
2147           item = NULL;
2148         }
2149         break;
2150       case GST_EVENT_CAPS:{
2151         GstCaps *caps;
2152
2153         gst_event_parse_caps (event, &caps);
2154         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2155
2156         if (!QUEUE_IS_USING_QUEUE (queue)) {
2157           GST_LOG ("Dropping caps event, not using queue");
2158           gst_event_unref (event);
2159           item = NULL;
2160         }
2161         break;
2162       }
2163       default:
2164         if (!QUEUE_IS_USING_QUEUE (queue))
2165           goto unexpected_event;
2166         break;
2167     }
2168   } else if (GST_IS_QUERY (item)) {
2169     /* Can't happen as we check that in the caller */
2170     if (!QUEUE_IS_USING_QUEUE (queue))
2171       g_assert_not_reached ();
2172   } else {
2173     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2174         item, GST_OBJECT_NAME (queue));
2175     /* we can't really unref since we don't know what it is */
2176     item = NULL;
2177   }
2178
2179   if (item) {
2180     /* update the buffering status */
2181     if (queue->use_buffering)
2182       update_buffering (queue);
2183
2184     if (QUEUE_IS_USING_QUEUE (queue)) {
2185       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2186       qitem->type = item_type;
2187       qitem->item = item;
2188       g_queue_push_tail (&queue->queue, qitem);
2189     } else {
2190       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2191     }
2192
2193     GST_QUEUE2_SIGNAL_ADD (queue);
2194   }
2195
2196   return;
2197
2198   /* ERRORS */
2199 unexpected_event:
2200   {
2201     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2202
2203     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2204         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2205         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2206     gst_event_unref (GST_EVENT_CAST (item));
2207     return;
2208   }
2209 }
2210
2211 /* dequeue an item from the queue and update level stats */
2212 static GstMiniObject *
2213 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2214 {
2215   GstMiniObject *item;
2216
2217   if (!QUEUE_IS_USING_QUEUE (queue)) {
2218     item = gst_queue2_read_item_from_file (queue);
2219   } else {
2220     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2221
2222     if (qitem == NULL)
2223       goto no_item;
2224
2225     item = qitem->item;
2226     g_slice_free (GstQueue2Item, qitem);
2227   }
2228
2229   if (item == NULL)
2230     goto no_item;
2231
2232   if (GST_IS_BUFFER (item)) {
2233     GstBuffer *buffer;
2234     guint size;
2235
2236     buffer = GST_BUFFER_CAST (item);
2237     size = gst_buffer_get_size (buffer);
2238     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2239
2240     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2241         "retrieved buffer %p from queue", buffer);
2242
2243     if (QUEUE_IS_USING_QUEUE (queue)) {
2244       queue->cur_level.buffers--;
2245       queue->cur_level.bytes -= size;
2246     }
2247     queue->bytes_out += size;
2248
2249     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
2250     /* update the byterate stats */
2251     update_out_rates (queue);
2252     /* update the buffering */
2253     if (queue->use_buffering)
2254       update_buffering (queue);
2255
2256   } else if (GST_IS_EVENT (item)) {
2257     GstEvent *event = GST_EVENT_CAST (item);
2258
2259     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2260
2261     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2262         "retrieved event %p from queue", event);
2263
2264     switch (GST_EVENT_TYPE (event)) {
2265       case GST_EVENT_EOS:
2266         /* queue is empty now that we dequeued the EOS */
2267         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2268         break;
2269       case GST_EVENT_SEGMENT:
2270         apply_segment (queue, event, &queue->src_segment, FALSE);
2271         break;
2272       case GST_EVENT_GAP:
2273         apply_gap (queue, event, &queue->src_segment, FALSE);
2274         break;
2275       default:
2276         break;
2277     }
2278   } else if (GST_IS_BUFFER_LIST (item)) {
2279     GstBufferList *buffer_list;
2280     guint size = 0;
2281
2282     buffer_list = GST_BUFFER_LIST_CAST (item);
2283     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2284     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2285
2286     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2287         "retrieved buffer list %p from queue", buffer_list);
2288
2289     if (QUEUE_IS_USING_QUEUE (queue)) {
2290       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2291       queue->cur_level.bytes -= size;
2292     }
2293     queue->bytes_out += size;
2294
2295     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2296     /* update the byterate stats */
2297     update_out_rates (queue);
2298     /* update the buffering */
2299     if (queue->use_buffering)
2300       update_buffering (queue);
2301   } else if (GST_IS_QUERY (item)) {
2302     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2303         "retrieved query %p from queue", item);
2304     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2305   } else {
2306     g_warning
2307         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2308         item, GST_OBJECT_NAME (queue));
2309     item = NULL;
2310     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2311   }
2312   GST_QUEUE2_SIGNAL_DEL (queue);
2313
2314   return item;
2315
2316   /* ERRORS */
2317 no_item:
2318   {
2319     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2320     return NULL;
2321   }
2322 }
2323
2324 static gboolean
2325 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2326     GstEvent * event)
2327 {
2328   gboolean ret = TRUE;
2329   GstQueue2 *queue;
2330
2331   queue = GST_QUEUE2 (parent);
2332
2333   switch (GST_EVENT_TYPE (event)) {
2334     case GST_EVENT_FLUSH_START:
2335     {
2336       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2337       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2338         /* forward event */
2339         ret = gst_pad_push_event (queue->srcpad, event);
2340
2341         /* now unblock the chain function */
2342         GST_QUEUE2_MUTEX_LOCK (queue);
2343         queue->srcresult = GST_FLOW_FLUSHING;
2344         queue->sinkresult = GST_FLOW_FLUSHING;
2345         /* unblock the loop and chain functions */
2346         GST_QUEUE2_SIGNAL_ADD (queue);
2347         GST_QUEUE2_SIGNAL_DEL (queue);
2348         queue->last_query = FALSE;
2349         g_cond_signal (&queue->query_handled);
2350         GST_QUEUE2_MUTEX_UNLOCK (queue);
2351
2352         /* make sure it pauses, this should happen since we sent
2353          * flush_start downstream. */
2354         gst_pad_pause_task (queue->srcpad);
2355         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2356       } else {
2357         GST_QUEUE2_MUTEX_LOCK (queue);
2358         /* flush the sink pad */
2359         queue->sinkresult = GST_FLOW_FLUSHING;
2360         GST_QUEUE2_SIGNAL_DEL (queue);
2361         queue->last_query = FALSE;
2362         g_cond_signal (&queue->query_handled);
2363         GST_QUEUE2_MUTEX_UNLOCK (queue);
2364
2365         gst_event_unref (event);
2366       }
2367       break;
2368     }
2369     case GST_EVENT_FLUSH_STOP:
2370     {
2371       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2372
2373       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2374         /* forward event */
2375         ret = gst_pad_push_event (queue->srcpad, event);
2376
2377         GST_QUEUE2_MUTEX_LOCK (queue);
2378         gst_queue2_locked_flush (queue, FALSE, TRUE);
2379         queue->srcresult = GST_FLOW_OK;
2380         queue->sinkresult = GST_FLOW_OK;
2381         queue->is_eos = FALSE;
2382         queue->unexpected = FALSE;
2383         queue->seeking = FALSE;
2384         /* reset rate counters */
2385         reset_rate_timer (queue);
2386         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2387             queue->srcpad, NULL);
2388         GST_QUEUE2_MUTEX_UNLOCK (queue);
2389       } else {
2390         GST_QUEUE2_MUTEX_LOCK (queue);
2391         queue->segment_event_received = FALSE;
2392         queue->is_eos = FALSE;
2393         queue->unexpected = FALSE;
2394         queue->sinkresult = GST_FLOW_OK;
2395         queue->seeking = FALSE;
2396         GST_QUEUE2_MUTEX_UNLOCK (queue);
2397
2398         gst_event_unref (event);
2399       }
2400       break;
2401     }
2402     default:
2403       if (GST_EVENT_IS_SERIALIZED (event)) {
2404         /* serialized events go in the queue */
2405         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2406         if (queue->srcresult != GST_FLOW_OK) {
2407           /* Errors in sticky event pushing are no problem and ignored here
2408            * as they will cause more meaningful errors during data flow.
2409            * For EOS events, that are not followed by data flow, we still
2410            * return FALSE here though and report an error.
2411            */
2412           if (!GST_EVENT_IS_STICKY (event)) {
2413             goto out_flow_error;
2414           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2415             if (queue->srcresult == GST_FLOW_NOT_LINKED
2416                 || queue->srcresult < GST_FLOW_EOS) {
2417               GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2418                   (_("Internal data flow error.")),
2419                   ("streaming task paused, reason %s (%d)",
2420                       gst_flow_get_name (queue->srcresult), queue->srcresult));
2421             }
2422             goto out_flow_error;
2423           }
2424         }
2425         /* refuse more events on EOS */
2426         if (queue->is_eos)
2427           goto out_eos;
2428         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2429         GST_QUEUE2_MUTEX_UNLOCK (queue);
2430         gst_queue2_post_buffering (queue);
2431       } else {
2432         /* non-serialized events are passed upstream. */
2433         ret = gst_pad_push_event (queue->srcpad, event);
2434       }
2435       break;
2436   }
2437   return ret;
2438
2439   /* ERRORS */
2440 out_flushing:
2441   {
2442     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2443     GST_QUEUE2_MUTEX_UNLOCK (queue);
2444     gst_event_unref (event);
2445     return FALSE;
2446   }
2447 out_eos:
2448   {
2449     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2450     GST_QUEUE2_MUTEX_UNLOCK (queue);
2451     gst_event_unref (event);
2452     return FALSE;
2453   }
2454 out_flow_error:
2455   {
2456     GST_LOG_OBJECT (queue,
2457         "refusing event, we have a downstream flow error: %s",
2458         gst_flow_get_name (queue->srcresult));
2459     GST_QUEUE2_MUTEX_UNLOCK (queue);
2460     gst_event_unref (event);
2461     return FALSE;
2462   }
2463 }
2464
2465 static gboolean
2466 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2467     GstQuery * query)
2468 {
2469   GstQueue2 *queue;
2470   gboolean res;
2471
2472   queue = GST_QUEUE2 (parent);
2473
2474   switch (GST_QUERY_TYPE (query)) {
2475     default:
2476       if (GST_QUERY_IS_SERIALIZED (query)) {
2477         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2478         /* serialized events go in the queue. We need to be certain that we
2479          * don't cause deadlocks waiting for the query return value. We check if
2480          * the queue is empty (nothing is blocking downstream and the query can
2481          * be pushed for sure) or we are not buffering. If we are buffering,
2482          * the pipeline waits to unblock downstream until our queue fills up
2483          * completely, which can not happen if we block on the query..
2484          * Therefore we only potentially block when we are not buffering. */
2485         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2486         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2487                 || !queue->use_buffering)) {
2488           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2489             gst_queue2_locked_enqueue (queue, query,
2490                 GST_QUEUE2_ITEM_TYPE_QUERY);
2491
2492             STATUS (queue, queue->sinkpad, "wait for QUERY");
2493             g_cond_wait (&queue->query_handled, &queue->qlock);
2494             if (queue->sinkresult != GST_FLOW_OK)
2495               goto out_flushing;
2496             res = queue->last_query;
2497           } else {
2498             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2499             res = FALSE;
2500           }
2501         } else {
2502           GST_DEBUG_OBJECT (queue,
2503               "refusing query, we are not using the queue");
2504           res = FALSE;
2505         }
2506         GST_QUEUE2_MUTEX_UNLOCK (queue);
2507         gst_queue2_post_buffering (queue);
2508       } else {
2509         res = gst_pad_query_default (pad, parent, query);
2510       }
2511       break;
2512   }
2513   return res;
2514
2515   /* ERRORS */
2516 out_flushing:
2517   {
2518     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2519     GST_QUEUE2_MUTEX_UNLOCK (queue);
2520     return FALSE;
2521   }
2522 }
2523
2524 static gboolean
2525 gst_queue2_is_empty (GstQueue2 * queue)
2526 {
2527   /* never empty on EOS */
2528   if (queue->is_eos)
2529     return FALSE;
2530
2531   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2532     return queue->current->writing_pos <= queue->current->max_reading_pos;
2533   } else {
2534     if (queue->queue.length == 0)
2535       return TRUE;
2536   }
2537
2538   return FALSE;
2539 }
2540
2541 static gboolean
2542 gst_queue2_is_filled (GstQueue2 * queue)
2543 {
2544   gboolean res;
2545
2546   /* always filled on EOS */
2547   if (queue->is_eos)
2548     return TRUE;
2549
2550 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2551     (queue->cur_level.format) >= ((alt_max) ? \
2552       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2553
2554   /* if using a ring buffer we're filled if all ring buffer space is used
2555    * _by the current range_ */
2556   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2557     guint64 rb_size = queue->ring_buffer_max_size;
2558     GST_DEBUG_OBJECT (queue,
2559         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2560         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2561     return CHECK_FILLED (bytes, rb_size);
2562   }
2563
2564   /* if using file, we're never filled if we don't have EOS */
2565   if (QUEUE_IS_USING_TEMP_FILE (queue))
2566     return FALSE;
2567
2568   /* we are never filled when we have no buffers at all */
2569   if (queue->cur_level.buffers == 0)
2570     return FALSE;
2571
2572   /* we are filled if one of the current levels exceeds the max */
2573   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2574       || CHECK_FILLED (time, 0);
2575
2576   /* if we need to, use the rate estimate to check against the max time we are
2577    * allowed to queue */
2578   if (queue->use_rate_estimate)
2579     res |= CHECK_FILLED (rate_time, 0);
2580
2581 #undef CHECK_FILLED
2582   return res;
2583 }
2584
2585 static GstFlowReturn
2586 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2587     GstMiniObject * item, GstQueue2ItemType item_type)
2588 {
2589   /* we have to lock the queue since we span threads */
2590   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2591   /* when we received EOS, we refuse more data */
2592   if (queue->is_eos)
2593     goto out_eos;
2594   /* when we received unexpected from downstream, refuse more buffers */
2595   if (queue->unexpected)
2596     goto out_unexpected;
2597
2598   /* while we didn't receive the newsegment, we're seeking and we skip data */
2599   if (queue->seeking)
2600     goto out_seeking;
2601
2602   if (!gst_queue2_wait_free_space (queue))
2603     goto out_flushing;
2604
2605   /* put buffer in queue now */
2606   gst_queue2_locked_enqueue (queue, item, item_type);
2607   GST_QUEUE2_MUTEX_UNLOCK (queue);
2608   gst_queue2_post_buffering (queue);
2609
2610   return GST_FLOW_OK;
2611
2612   /* special conditions */
2613 out_flushing:
2614   {
2615     GstFlowReturn ret = queue->sinkresult;
2616
2617     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2618         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2619     GST_QUEUE2_MUTEX_UNLOCK (queue);
2620     gst_mini_object_unref (item);
2621
2622     return ret;
2623   }
2624 out_eos:
2625   {
2626     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2627     GST_QUEUE2_MUTEX_UNLOCK (queue);
2628     gst_mini_object_unref (item);
2629
2630     return GST_FLOW_EOS;
2631   }
2632 out_seeking:
2633   {
2634     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2635     GST_QUEUE2_MUTEX_UNLOCK (queue);
2636     gst_mini_object_unref (item);
2637
2638     return GST_FLOW_OK;
2639   }
2640 out_unexpected:
2641   {
2642     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2643     GST_QUEUE2_MUTEX_UNLOCK (queue);
2644     gst_mini_object_unref (item);
2645
2646     return GST_FLOW_EOS;
2647   }
2648 }
2649
2650 static GstFlowReturn
2651 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2652 {
2653   GstQueue2 *queue;
2654
2655   queue = GST_QUEUE2 (parent);
2656
2657   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2658       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2659       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2660       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2661       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2662
2663   return gst_queue2_chain_buffer_or_buffer_list (queue,
2664       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2665 }
2666
2667 static GstFlowReturn
2668 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2669     GstBufferList * buffer_list)
2670 {
2671   GstQueue2 *queue;
2672
2673   queue = GST_QUEUE2 (parent);
2674
2675   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2676       "received buffer list %p", buffer_list);
2677
2678   return gst_queue2_chain_buffer_or_buffer_list (queue,
2679       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2680 }
2681
2682 static GstMiniObject *
2683 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2684 {
2685   GstMiniObject *data;
2686
2687   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2688
2689   /* stop pushing buffers, we dequeue all items until we see an item that we
2690    * can push again, which is EOS or SEGMENT. If there is nothing in the
2691    * queue we can push, we set a flag to make the sinkpad refuse more
2692    * buffers with an EOS return value until we receive something
2693    * pushable again or we get flushed. */
2694   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2695     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2696       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2697           "dropping EOS buffer %p", data);
2698       gst_buffer_unref (GST_BUFFER_CAST (data));
2699     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2700       GstEvent *event = GST_EVENT_CAST (data);
2701       GstEventType type = GST_EVENT_TYPE (event);
2702
2703       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2704         /* we found a pushable item in the queue, push it out */
2705         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2706             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2707         return data;
2708       }
2709       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2710           "dropping EOS event %p", event);
2711       gst_event_unref (event);
2712     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2713       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2714           "dropping EOS buffer list %p", data);
2715       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2716     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2717       queue->last_query = FALSE;
2718       g_cond_signal (&queue->query_handled);
2719       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2720     }
2721   }
2722   /* no more items in the queue. Set the unexpected flag so that upstream
2723    * make us refuse any more buffers on the sinkpad. Since we will still
2724    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2725    * task function does not shut down. */
2726   queue->unexpected = TRUE;
2727   return NULL;
2728 }
2729
2730 /* dequeue an item from the queue an push it downstream. This functions returns
2731  * the result of the push. */
2732 static GstFlowReturn
2733 gst_queue2_push_one (GstQueue2 * queue)
2734 {
2735   GstFlowReturn result = queue->srcresult;
2736   GstMiniObject *data;
2737   GstQueue2ItemType item_type;
2738
2739   data = gst_queue2_locked_dequeue (queue, &item_type);
2740   if (data == NULL)
2741     goto no_item;
2742
2743 next:
2744   STATUS (queue, queue->srcpad, "We have something dequeud");
2745   g_atomic_int_set (&queue->downstream_may_block,
2746       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2747       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2748   GST_QUEUE2_MUTEX_UNLOCK (queue);
2749   gst_queue2_post_buffering (queue);
2750
2751   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2752     GstBuffer *buffer;
2753
2754     buffer = GST_BUFFER_CAST (data);
2755
2756     result = gst_pad_push (queue->srcpad, buffer);
2757     g_atomic_int_set (&queue->downstream_may_block, 0);
2758
2759     /* need to check for srcresult here as well */
2760     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2761     if (result == GST_FLOW_EOS) {
2762       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2763       if (data != NULL)
2764         goto next;
2765       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2766        * to the caller so that the task function does not shut down */
2767       result = GST_FLOW_OK;
2768     }
2769   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2770     GstEvent *event = GST_EVENT_CAST (data);
2771     GstEventType type = GST_EVENT_TYPE (event);
2772
2773     gst_pad_push_event (queue->srcpad, event);
2774
2775     /* if we're EOS, return EOS so that the task pauses. */
2776     if (type == GST_EVENT_EOS) {
2777       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2778           "pushed EOS event %p, return EOS", event);
2779       result = GST_FLOW_EOS;
2780     }
2781
2782     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2783   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2784     GstBufferList *buffer_list;
2785
2786     buffer_list = GST_BUFFER_LIST_CAST (data);
2787
2788     result = gst_pad_push_list (queue->srcpad, buffer_list);
2789     g_atomic_int_set (&queue->downstream_may_block, 0);
2790
2791     /* need to check for srcresult here as well */
2792     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2793     if (result == GST_FLOW_EOS) {
2794       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2795       if (data != NULL)
2796         goto next;
2797       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2798        * to the caller so that the task function does not shut down */
2799       result = GST_FLOW_OK;
2800     }
2801   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2802     GstQuery *query = GST_QUERY_CAST (data);
2803
2804     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2805     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2806     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2807     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2808         "did query %p, return %d", query, queue->last_query);
2809     g_cond_signal (&queue->query_handled);
2810     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2811     result = GST_FLOW_OK;
2812   }
2813   return result;
2814
2815   /* ERRORS */
2816 no_item:
2817   {
2818     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2819         "exit because we have no item in the queue");
2820     return GST_FLOW_ERROR;
2821   }
2822 out_flushing:
2823   {
2824     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2825     return GST_FLOW_FLUSHING;
2826   }
2827 }
2828
2829 /* called repeatedly with @pad as the source pad. This function should push out
2830  * data to the peer element. */
2831 static void
2832 gst_queue2_loop (GstPad * pad)
2833 {
2834   GstQueue2 *queue;
2835   GstFlowReturn ret;
2836
2837   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2838
2839   /* have to lock for thread-safety */
2840   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2841
2842   if (gst_queue2_is_empty (queue)) {
2843     gboolean started;
2844
2845     /* pause the timer while we wait. The fact that we are waiting does not mean
2846      * the byterate on the output pad is lower */
2847     if ((started = queue->out_timer_started))
2848       g_timer_stop (queue->out_timer);
2849
2850     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2851         "queue is empty, waiting for new data");
2852     do {
2853       /* Wait for data to be available, we could be unlocked because of a flush. */
2854       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2855     }
2856     while (gst_queue2_is_empty (queue));
2857
2858     /* and continue if we were running before */
2859     if (started)
2860       g_timer_continue (queue->out_timer);
2861   }
2862   ret = gst_queue2_push_one (queue);
2863   queue->srcresult = ret;
2864   queue->sinkresult = ret;
2865   if (ret != GST_FLOW_OK)
2866     goto out_flushing;
2867
2868   GST_QUEUE2_MUTEX_UNLOCK (queue);
2869   gst_queue2_post_buffering (queue);
2870
2871   return;
2872
2873   /* ERRORS */
2874 out_flushing:
2875   {
2876     gboolean eos = queue->is_eos;
2877     GstFlowReturn ret = queue->srcresult;
2878
2879     gst_pad_pause_task (queue->srcpad);
2880     if (ret == GST_FLOW_FLUSHING) {
2881       gst_queue2_locked_flush (queue, FALSE, FALSE);
2882     } else {
2883       GST_QUEUE2_SIGNAL_DEL (queue);
2884       queue->last_query = FALSE;
2885       g_cond_signal (&queue->query_handled);
2886     }
2887     GST_QUEUE2_MUTEX_UNLOCK (queue);
2888     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2889         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2890     /* let app know about us giving up if upstream is not expected to do so */
2891     /* EOS is already taken care of elsewhere */
2892     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
2893       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2894           (_("Internal data flow error.")),
2895           ("streaming task paused, reason %s (%d)",
2896               gst_flow_get_name (ret), ret));
2897       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2898     }
2899     return;
2900   }
2901 }
2902
2903 static gboolean
2904 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2905 {
2906   gboolean res = TRUE;
2907   GstQueue2 *queue = GST_QUEUE2 (parent);
2908
2909 #ifndef GST_DISABLE_GST_DEBUG
2910   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2911       event, GST_EVENT_TYPE_NAME (event));
2912 #endif
2913
2914   switch (GST_EVENT_TYPE (event)) {
2915     case GST_EVENT_FLUSH_START:
2916       if (QUEUE_IS_USING_QUEUE (queue)) {
2917         /* just forward upstream */
2918         res = gst_pad_push_event (queue->sinkpad, event);
2919       } else {
2920         /* now unblock the getrange function */
2921         GST_QUEUE2_MUTEX_LOCK (queue);
2922         GST_DEBUG_OBJECT (queue, "flushing");
2923         queue->srcresult = GST_FLOW_FLUSHING;
2924         GST_QUEUE2_SIGNAL_ADD (queue);
2925         GST_QUEUE2_MUTEX_UNLOCK (queue);
2926
2927         /* when using a temp file, we eat the event */
2928         res = TRUE;
2929         gst_event_unref (event);
2930       }
2931       break;
2932     case GST_EVENT_FLUSH_STOP:
2933       if (QUEUE_IS_USING_QUEUE (queue)) {
2934         /* just forward upstream */
2935         res = gst_pad_push_event (queue->sinkpad, event);
2936       } else {
2937         /* now unblock the getrange function */
2938         GST_QUEUE2_MUTEX_LOCK (queue);
2939         queue->srcresult = GST_FLOW_OK;
2940         GST_QUEUE2_MUTEX_UNLOCK (queue);
2941
2942         /* when using a temp file, we eat the event */
2943         res = TRUE;
2944         gst_event_unref (event);
2945       }
2946       break;
2947     case GST_EVENT_RECONFIGURE:
2948       GST_QUEUE2_MUTEX_LOCK (queue);
2949       /* assume downstream is linked now and try to push again */
2950       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
2951         queue->srcresult = GST_FLOW_OK;
2952         queue->sinkresult = GST_FLOW_OK;
2953         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
2954           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
2955               NULL);
2956         }
2957       }
2958       GST_QUEUE2_MUTEX_UNLOCK (queue);
2959
2960       res = gst_pad_push_event (queue->sinkpad, event);
2961       break;
2962     default:
2963       res = gst_pad_push_event (queue->sinkpad, event);
2964       break;
2965   }
2966
2967   return res;
2968 }
2969
2970 static gboolean
2971 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2972 {
2973   GstQueue2 *queue;
2974
2975   queue = GST_QUEUE2 (parent);
2976
2977   switch (GST_QUERY_TYPE (query)) {
2978     case GST_QUERY_POSITION:
2979     {
2980       gint64 peer_pos;
2981       GstFormat format;
2982
2983       if (!gst_pad_peer_query (queue->sinkpad, query))
2984         goto peer_failed;
2985
2986       /* get peer position */
2987       gst_query_parse_position (query, &format, &peer_pos);
2988
2989       /* FIXME: this code assumes that there's no discont in the queue */
2990       switch (format) {
2991         case GST_FORMAT_BYTES:
2992           peer_pos -= queue->cur_level.bytes;
2993           break;
2994         case GST_FORMAT_TIME:
2995           peer_pos -= queue->cur_level.time;
2996           break;
2997         default:
2998           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
2999               "know how to adjust value", gst_format_get_name (format));
3000           return FALSE;
3001       }
3002       /* set updated position */
3003       gst_query_set_position (query, format, peer_pos);
3004       break;
3005     }
3006     case GST_QUERY_DURATION:
3007     {
3008       GST_DEBUG_OBJECT (queue, "doing peer query");
3009
3010       if (!gst_pad_peer_query (queue->sinkpad, query))
3011         goto peer_failed;
3012
3013       GST_DEBUG_OBJECT (queue, "peer query success");
3014       break;
3015     }
3016     case GST_QUERY_BUFFERING:
3017     {
3018       gint percent;
3019       gboolean is_buffering;
3020       GstBufferingMode mode;
3021       gint avg_in, avg_out;
3022       gint64 buffering_left;
3023
3024       GST_DEBUG_OBJECT (queue, "query buffering");
3025
3026       get_buffering_percent (queue, &is_buffering, &percent);
3027       gst_query_set_buffering_percent (query, is_buffering, percent);
3028
3029       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3030           &buffering_left);
3031       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3032           buffering_left);
3033
3034       if (!QUEUE_IS_USING_QUEUE (queue)) {
3035         /* add ranges for download and ringbuffer buffering */
3036         GstFormat format;
3037         gint64 start, stop, range_start, range_stop;
3038         guint64 writing_pos;
3039         gint64 estimated_total;
3040         gint64 duration;
3041         gboolean peer_res, is_eos;
3042         GstQueue2Range *queued_ranges;
3043
3044         /* we need a current download region */
3045         if (queue->current == NULL)
3046           return FALSE;
3047
3048         writing_pos = queue->current->writing_pos;
3049         is_eos = queue->is_eos;
3050
3051         if (is_eos) {
3052           /* we're EOS, we know the duration in bytes now */
3053           peer_res = TRUE;
3054           duration = writing_pos;
3055         } else {
3056           /* get duration of upstream in bytes */
3057           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3058               GST_FORMAT_BYTES, &duration);
3059         }
3060
3061         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3062             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3063
3064         /* calculate remaining and total download time */
3065         if (peer_res && avg_in > 0.0)
3066           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3067         else
3068           estimated_total = -1;
3069
3070         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3071             estimated_total);
3072
3073         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3074
3075         switch (format) {
3076           case GST_FORMAT_PERCENT:
3077             /* we need duration */
3078             if (!peer_res)
3079               goto peer_failed;
3080
3081             start = 0;
3082             /* get our available data relative to the duration */
3083             if (duration != -1)
3084               stop =
3085                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3086                   duration);
3087             else
3088               stop = -1;
3089             break;
3090           case GST_FORMAT_BYTES:
3091             start = 0;
3092             stop = writing_pos;
3093             break;
3094           default:
3095             start = -1;
3096             stop = -1;
3097             break;
3098         }
3099
3100         /* fill out the buffered ranges */
3101         for (queued_ranges = queue->ranges; queued_ranges;
3102             queued_ranges = queued_ranges->next) {
3103           switch (format) {
3104             case GST_FORMAT_PERCENT:
3105               if (duration == -1) {
3106                 range_start = 0;
3107                 range_stop = 0;
3108                 break;
3109               }
3110               range_start =
3111                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3112                   queued_ranges->offset, duration);
3113               range_stop =
3114                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3115                   queued_ranges->writing_pos, duration);
3116               break;
3117             case GST_FORMAT_BYTES:
3118               range_start = queued_ranges->offset;
3119               range_stop = queued_ranges->writing_pos;
3120               break;
3121             default:
3122               range_start = -1;
3123               range_stop = -1;
3124               break;
3125           }
3126           if (range_start == range_stop)
3127             continue;
3128           GST_DEBUG_OBJECT (queue,
3129               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3130               G_GINT64_FORMAT, range_start, range_stop);
3131           gst_query_add_buffering_range (query, range_start, range_stop);
3132         }
3133
3134         gst_query_set_buffering_range (query, format, start, stop,
3135             estimated_total);
3136       }
3137       break;
3138     }
3139     case GST_QUERY_SCHEDULING:
3140     {
3141       gboolean pull_mode;
3142       GstSchedulingFlags flags = 0;
3143
3144       if (!gst_pad_peer_query (queue->sinkpad, query))
3145         goto peer_failed;
3146
3147       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3148
3149       /* we can operate in pull mode when we are using a tempfile */
3150       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3151
3152       if (pull_mode)
3153         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3154       gst_query_set_scheduling (query, flags, 0, -1, 0);
3155       if (pull_mode)
3156         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3157       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3158       break;
3159     }
3160     default:
3161       /* peer handled other queries */
3162       if (!gst_pad_query_default (pad, parent, query))
3163         goto peer_failed;
3164       break;
3165   }
3166
3167   return TRUE;
3168
3169   /* ERRORS */
3170 peer_failed:
3171   {
3172     GST_DEBUG_OBJECT (queue, "failed peer query");
3173     return FALSE;
3174   }
3175 }
3176
3177 static gboolean
3178 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3179 {
3180   GstQueue2 *queue = GST_QUEUE2 (element);
3181
3182   /* simply forward to the srcpad query function */
3183   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3184       query);
3185 }
3186
3187 static void
3188 gst_queue2_update_upstream_size (GstQueue2 * queue)
3189 {
3190   gint64 upstream_size = -1;
3191
3192   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3193           &upstream_size)) {
3194     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3195
3196     /* upstream_size can be negative but queue->upstream_size is unsigned.
3197      * Prevent setting negative values to it (the query can return -1) */
3198     if (upstream_size >= 0)
3199       queue->upstream_size = upstream_size;
3200     else
3201       queue->upstream_size = 0;
3202   }
3203 }
3204
3205 static GstFlowReturn
3206 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3207     guint length, GstBuffer ** buffer)
3208 {
3209   GstQueue2 *queue;
3210   GstFlowReturn ret;
3211
3212   queue = GST_QUEUE2_CAST (parent);
3213
3214   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3215   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3216   offset = (offset == -1) ? queue->current->reading_pos : offset;
3217
3218   GST_DEBUG_OBJECT (queue,
3219       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3220
3221   /* catch any reads beyond the size of the file here to make sure queue2
3222    * doesn't send seek events beyond the size of the file upstream, since
3223    * that would confuse elements such as souphttpsrc and/or http servers.
3224    * Demuxers often just loop until EOS at the end of the file to figure out
3225    * when they've read all the end-headers or index chunks. */
3226   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3227     gst_queue2_update_upstream_size (queue);
3228     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3229       goto out_unexpected;
3230   }
3231
3232   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3233     gst_queue2_update_upstream_size (queue);
3234     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3235       length = queue->upstream_size - offset;
3236       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3237     }
3238   }
3239
3240   /* FIXME - function will block when the range is not yet available */
3241   ret = gst_queue2_create_read (queue, offset, length, buffer);
3242   GST_QUEUE2_MUTEX_UNLOCK (queue);
3243   gst_queue2_post_buffering (queue);
3244
3245   return ret;
3246
3247   /* ERRORS */
3248 out_flushing:
3249   {
3250     ret = queue->srcresult;
3251
3252     GST_DEBUG_OBJECT (queue, "we are flushing");
3253     GST_QUEUE2_MUTEX_UNLOCK (queue);
3254     return ret;
3255   }
3256 out_unexpected:
3257   {
3258     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3259     GST_QUEUE2_MUTEX_UNLOCK (queue);
3260     return GST_FLOW_EOS;
3261   }
3262 }
3263
3264 /* sink currently only operates in push mode */
3265 static gboolean
3266 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3267     GstPadMode mode, gboolean active)
3268 {
3269   gboolean result;
3270   GstQueue2 *queue;
3271
3272   queue = GST_QUEUE2 (parent);
3273
3274   switch (mode) {
3275     case GST_PAD_MODE_PUSH:
3276       if (active) {
3277         GST_QUEUE2_MUTEX_LOCK (queue);
3278         GST_DEBUG_OBJECT (queue, "activating push mode");
3279         queue->srcresult = GST_FLOW_OK;
3280         queue->sinkresult = GST_FLOW_OK;
3281         queue->is_eos = FALSE;
3282         queue->unexpected = FALSE;
3283         reset_rate_timer (queue);
3284         GST_QUEUE2_MUTEX_UNLOCK (queue);
3285       } else {
3286         /* unblock chain function */
3287         GST_QUEUE2_MUTEX_LOCK (queue);
3288         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3289         queue->srcresult = GST_FLOW_FLUSHING;
3290         queue->sinkresult = GST_FLOW_FLUSHING;
3291         GST_QUEUE2_SIGNAL_DEL (queue);
3292         /* Unblock query handler */
3293         queue->last_query = FALSE;
3294         g_cond_signal (&queue->query_handled);
3295         GST_QUEUE2_MUTEX_UNLOCK (queue);
3296
3297         /* wait until it is unblocked and clean up */
3298         GST_PAD_STREAM_LOCK (pad);
3299         GST_QUEUE2_MUTEX_LOCK (queue);
3300         gst_queue2_locked_flush (queue, TRUE, FALSE);
3301         GST_QUEUE2_MUTEX_UNLOCK (queue);
3302         GST_PAD_STREAM_UNLOCK (pad);
3303       }
3304       result = TRUE;
3305       break;
3306     default:
3307       result = FALSE;
3308       break;
3309   }
3310   return result;
3311 }
3312
3313 /* src operating in push mode, we start a task on the source pad that pushes out
3314  * buffers from the queue */
3315 static gboolean
3316 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3317 {
3318   gboolean result = FALSE;
3319   GstQueue2 *queue;
3320
3321   queue = GST_QUEUE2 (parent);
3322
3323   if (active) {
3324     GST_QUEUE2_MUTEX_LOCK (queue);
3325     GST_DEBUG_OBJECT (queue, "activating push mode");
3326     queue->srcresult = GST_FLOW_OK;
3327     queue->sinkresult = GST_FLOW_OK;
3328     queue->is_eos = FALSE;
3329     queue->unexpected = FALSE;
3330     result =
3331         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3332     GST_QUEUE2_MUTEX_UNLOCK (queue);
3333   } else {
3334     /* unblock loop function */
3335     GST_QUEUE2_MUTEX_LOCK (queue);
3336     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3337     queue->srcresult = GST_FLOW_FLUSHING;
3338     queue->sinkresult = GST_FLOW_FLUSHING;
3339     /* the item add signal will unblock */
3340     GST_QUEUE2_SIGNAL_ADD (queue);
3341     GST_QUEUE2_MUTEX_UNLOCK (queue);
3342
3343     /* step 2, make sure streaming finishes */
3344     result = gst_pad_stop_task (pad);
3345   }
3346
3347   return result;
3348 }
3349
3350 /* pull mode, downstream will call our getrange function */
3351 static gboolean
3352 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3353 {
3354   gboolean result;
3355   GstQueue2 *queue;
3356
3357   queue = GST_QUEUE2 (parent);
3358
3359   if (active) {
3360     GST_QUEUE2_MUTEX_LOCK (queue);
3361     if (!QUEUE_IS_USING_QUEUE (queue)) {
3362       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3363         /* open the temp file now */
3364         result = gst_queue2_open_temp_location_file (queue);
3365       } else if (!queue->ring_buffer) {
3366         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3367         result = ! !queue->ring_buffer;
3368       } else {
3369         result = TRUE;
3370       }
3371
3372       GST_DEBUG_OBJECT (queue, "activating pull mode");
3373       init_ranges (queue);
3374       queue->srcresult = GST_FLOW_OK;
3375       queue->sinkresult = GST_FLOW_OK;
3376       queue->is_eos = FALSE;
3377       queue->unexpected = FALSE;
3378       queue->upstream_size = 0;
3379     } else {
3380       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3381       /* this is not allowed, we cannot operate in pull mode without a temp
3382        * file. */
3383       queue->srcresult = GST_FLOW_FLUSHING;
3384       queue->sinkresult = GST_FLOW_FLUSHING;
3385       result = FALSE;
3386     }
3387     GST_QUEUE2_MUTEX_UNLOCK (queue);
3388   } else {
3389     GST_QUEUE2_MUTEX_LOCK (queue);
3390     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3391     queue->srcresult = GST_FLOW_FLUSHING;
3392     queue->sinkresult = GST_FLOW_FLUSHING;
3393     /* this will unlock getrange */
3394     GST_QUEUE2_SIGNAL_ADD (queue);
3395     result = TRUE;
3396     GST_QUEUE2_MUTEX_UNLOCK (queue);
3397   }
3398
3399   return result;
3400 }
3401
3402 static gboolean
3403 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3404     gboolean active)
3405 {
3406   gboolean res;
3407
3408   switch (mode) {
3409     case GST_PAD_MODE_PULL:
3410       res = gst_queue2_src_activate_pull (pad, parent, active);
3411       break;
3412     case GST_PAD_MODE_PUSH:
3413       res = gst_queue2_src_activate_push (pad, parent, active);
3414       break;
3415     default:
3416       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3417       res = FALSE;
3418       break;
3419   }
3420   return res;
3421 }
3422
3423 static GstStateChangeReturn
3424 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3425 {
3426   GstQueue2 *queue;
3427   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3428
3429   queue = GST_QUEUE2 (element);
3430
3431   switch (transition) {
3432     case GST_STATE_CHANGE_NULL_TO_READY:
3433       break;
3434     case GST_STATE_CHANGE_READY_TO_PAUSED:
3435       GST_QUEUE2_MUTEX_LOCK (queue);
3436       if (!QUEUE_IS_USING_QUEUE (queue)) {
3437         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3438           if (!gst_queue2_open_temp_location_file (queue))
3439             ret = GST_STATE_CHANGE_FAILURE;
3440         } else {
3441           if (queue->ring_buffer) {
3442             g_free (queue->ring_buffer);
3443             queue->ring_buffer = NULL;
3444           }
3445           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3446             ret = GST_STATE_CHANGE_FAILURE;
3447         }
3448         init_ranges (queue);
3449       }
3450       queue->segment_event_received = FALSE;
3451       queue->starting_segment = NULL;
3452       gst_event_replace (&queue->stream_start_event, NULL);
3453       GST_QUEUE2_MUTEX_UNLOCK (queue);
3454       break;
3455     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3456       break;
3457     default:
3458       break;
3459   }
3460
3461   if (ret == GST_STATE_CHANGE_FAILURE)
3462     return ret;
3463
3464   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3465
3466   if (ret == GST_STATE_CHANGE_FAILURE)
3467     return ret;
3468
3469   switch (transition) {
3470     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3471       break;
3472     case GST_STATE_CHANGE_PAUSED_TO_READY:
3473       GST_QUEUE2_MUTEX_LOCK (queue);
3474       if (!QUEUE_IS_USING_QUEUE (queue)) {
3475         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3476           gst_queue2_close_temp_location_file (queue);
3477         } else if (queue->ring_buffer) {
3478           g_free (queue->ring_buffer);
3479           queue->ring_buffer = NULL;
3480         }
3481         clean_ranges (queue);
3482       }
3483       if (queue->starting_segment != NULL) {
3484         gst_event_unref (queue->starting_segment);
3485         queue->starting_segment = NULL;
3486       }
3487       gst_event_replace (&queue->stream_start_event, NULL);
3488       GST_QUEUE2_MUTEX_UNLOCK (queue);
3489       break;
3490     case GST_STATE_CHANGE_READY_TO_NULL:
3491       break;
3492     default:
3493       break;
3494   }
3495
3496   return ret;
3497 }
3498
3499 /* changing the capacity of the queue must wake up
3500  * the _chain function, it might have more room now
3501  * to store the buffer/event in the queue */
3502 #define QUEUE_CAPACITY_CHANGE(q) \
3503   GST_QUEUE2_SIGNAL_DEL (queue); \
3504   if (queue->use_buffering)      \
3505     update_buffering (queue);
3506
3507 /* Changing the minimum required fill level must
3508  * wake up the _loop function as it might now
3509  * be able to preceed.
3510  */
3511 #define QUEUE_THRESHOLD_CHANGE(q)\
3512   GST_QUEUE2_SIGNAL_ADD (queue);
3513
3514 static void
3515 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3516 {
3517   GstState state;
3518
3519   /* the element must be stopped in order to do this */
3520   GST_OBJECT_LOCK (queue);
3521   state = GST_STATE (queue);
3522   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3523     goto wrong_state;
3524   GST_OBJECT_UNLOCK (queue);
3525
3526   /* set new location */
3527   g_free (queue->temp_template);
3528   queue->temp_template = g_strdup (template);
3529
3530   return;
3531
3532 /* ERROR */
3533 wrong_state:
3534   {
3535     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3536     GST_OBJECT_UNLOCK (queue);
3537   }
3538 }
3539
3540 static void
3541 gst_queue2_set_property (GObject * object,
3542     guint prop_id, const GValue * value, GParamSpec * pspec)
3543 {
3544   GstQueue2 *queue = GST_QUEUE2 (object);
3545
3546   /* someone could change levels here, and since this
3547    * affects the get/put funcs, we need to lock for safety. */
3548   GST_QUEUE2_MUTEX_LOCK (queue);
3549
3550   switch (prop_id) {
3551     case PROP_MAX_SIZE_BYTES:
3552       queue->max_level.bytes = g_value_get_uint (value);
3553       QUEUE_CAPACITY_CHANGE (queue);
3554       break;
3555     case PROP_MAX_SIZE_BUFFERS:
3556       queue->max_level.buffers = g_value_get_uint (value);
3557       QUEUE_CAPACITY_CHANGE (queue);
3558       break;
3559     case PROP_MAX_SIZE_TIME:
3560       queue->max_level.time = g_value_get_uint64 (value);
3561       /* set rate_time to the same value. We use an extra field in the level
3562        * structure so that we can easily access and compare it */
3563       queue->max_level.rate_time = queue->max_level.time;
3564       QUEUE_CAPACITY_CHANGE (queue);
3565       break;
3566     case PROP_USE_BUFFERING:
3567       queue->use_buffering = g_value_get_boolean (value);
3568       if (!queue->use_buffering && queue->is_buffering) {
3569         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3570             "posting 100%% message");
3571         SET_PERCENT (queue, 100);
3572         queue->is_buffering = FALSE;
3573       }
3574
3575       if (queue->use_buffering) {
3576         queue->is_buffering = TRUE;
3577         update_buffering (queue);
3578       }
3579       break;
3580     case PROP_USE_RATE_ESTIMATE:
3581       queue->use_rate_estimate = g_value_get_boolean (value);
3582       break;
3583     case PROP_LOW_PERCENT:
3584       queue->low_percent = g_value_get_int (value);
3585       break;
3586     case PROP_HIGH_PERCENT:
3587       queue->high_percent = g_value_get_int (value);
3588       break;
3589     case PROP_TEMP_TEMPLATE:
3590       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3591       break;
3592     case PROP_TEMP_REMOVE:
3593       queue->temp_remove = g_value_get_boolean (value);
3594       break;
3595     case PROP_RING_BUFFER_MAX_SIZE:
3596       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3597       break;
3598     default:
3599       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3600       break;
3601   }
3602
3603   GST_QUEUE2_MUTEX_UNLOCK (queue);
3604   gst_queue2_post_buffering (queue);
3605 }
3606
3607 static void
3608 gst_queue2_get_property (GObject * object,
3609     guint prop_id, GValue * value, GParamSpec * pspec)
3610 {
3611   GstQueue2 *queue = GST_QUEUE2 (object);
3612
3613   GST_QUEUE2_MUTEX_LOCK (queue);
3614
3615   switch (prop_id) {
3616     case PROP_CUR_LEVEL_BYTES:
3617       g_value_set_uint (value, queue->cur_level.bytes);
3618       break;
3619     case PROP_CUR_LEVEL_BUFFERS:
3620       g_value_set_uint (value, queue->cur_level.buffers);
3621       break;
3622     case PROP_CUR_LEVEL_TIME:
3623       g_value_set_uint64 (value, queue->cur_level.time);
3624       break;
3625     case PROP_MAX_SIZE_BYTES:
3626       g_value_set_uint (value, queue->max_level.bytes);
3627       break;
3628     case PROP_MAX_SIZE_BUFFERS:
3629       g_value_set_uint (value, queue->max_level.buffers);
3630       break;
3631     case PROP_MAX_SIZE_TIME:
3632       g_value_set_uint64 (value, queue->max_level.time);
3633       break;
3634     case PROP_USE_BUFFERING:
3635       g_value_set_boolean (value, queue->use_buffering);
3636       break;
3637     case PROP_USE_RATE_ESTIMATE:
3638       g_value_set_boolean (value, queue->use_rate_estimate);
3639       break;
3640     case PROP_LOW_PERCENT:
3641       g_value_set_int (value, queue->low_percent);
3642       break;
3643     case PROP_HIGH_PERCENT:
3644       g_value_set_int (value, queue->high_percent);
3645       break;
3646     case PROP_TEMP_TEMPLATE:
3647       g_value_set_string (value, queue->temp_template);
3648       break;
3649     case PROP_TEMP_LOCATION:
3650       g_value_set_string (value, queue->temp_location);
3651       break;
3652     case PROP_TEMP_REMOVE:
3653       g_value_set_boolean (value, queue->temp_remove);
3654       break;
3655     case PROP_RING_BUFFER_MAX_SIZE:
3656       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3657       break;
3658     case PROP_AVG_IN_RATE:
3659     {
3660       gdouble in_rate = queue->byte_in_rate;
3661
3662       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3663        * calculated, so calculate it here. */
3664       if (in_rate == 0.0 && queue->bytes_in
3665           && queue->last_update_in_rates_elapsed > 0.0)
3666         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3667
3668       g_value_set_int64 (value, (gint64) in_rate);
3669       break;
3670     }
3671     default:
3672       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3673       break;
3674   }
3675
3676   GST_QUEUE2_MUTEX_UNLOCK (queue);
3677 }