queue2: avoid calculating fill levels multiple times
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 #ifdef __BIONIC__               /* Android */
77 #undef lseek
78 #define lseek lseek64
79 #undef off_t
80 #define off_t guint64
81 #include <fcntl.h>
82 #endif
83
84 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
85     GST_PAD_SINK,
86     GST_PAD_ALWAYS,
87     GST_STATIC_CAPS_ANY);
88
89 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
90     GST_PAD_SRC,
91     GST_PAD_ALWAYS,
92     GST_STATIC_CAPS_ANY);
93
94 GST_DEBUG_CATEGORY_STATIC (queue_debug);
95 #define GST_CAT_DEFAULT (queue_debug)
96 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
97
98 enum
99 {
100   SIGNAL_OVERRUN,
101   LAST_SIGNAL
102 };
103
104 /* other defines */
105 #define DEFAULT_BUFFER_SIZE 4096
106 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
107 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
108 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
109
110 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
111
112 /* default property values */
113 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
114 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
115 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
116 #define DEFAULT_USE_BUFFERING      FALSE
117 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
118 #define DEFAULT_LOW_PERCENT        10
119 #define DEFAULT_HIGH_PERCENT       99
120 #define DEFAULT_TEMP_REMOVE        TRUE
121 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
122
123 enum
124 {
125   PROP_0,
126   PROP_CUR_LEVEL_BUFFERS,
127   PROP_CUR_LEVEL_BYTES,
128   PROP_CUR_LEVEL_TIME,
129   PROP_MAX_SIZE_BUFFERS,
130   PROP_MAX_SIZE_BYTES,
131   PROP_MAX_SIZE_TIME,
132   PROP_USE_BUFFERING,
133   PROP_USE_RATE_ESTIMATE,
134   PROP_LOW_PERCENT,
135   PROP_HIGH_PERCENT,
136   PROP_TEMP_TEMPLATE,
137   PROP_TEMP_LOCATION,
138   PROP_TEMP_REMOVE,
139   PROP_RING_BUFFER_MAX_SIZE,
140   PROP_AVG_IN_RATE,
141   PROP_LAST
142 };
143
144 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
145   l.buffers = 0;                                        \
146   l.bytes = 0;                                          \
147   l.time = 0;                                           \
148   l.rate_time = 0;                                      \
149 } G_STMT_END
150
151 #define STATUS(queue, pad, msg) \
152   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
153                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
154                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
155                       " ns, %"G_GUINT64_FORMAT" items", \
156                       GST_DEBUG_PAD_NAME (pad), \
157                       queue->cur_level.buffers, \
158                       queue->max_level.buffers, \
159                       queue->cur_level.bytes, \
160                       queue->max_level.bytes, \
161                       queue->cur_level.time, \
162                       queue->max_level.time, \
163                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
164                         queue->current->writing_pos - queue->current->max_reading_pos : \
165                         queue->queue.length))
166
167 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
168   g_mutex_lock (&q->qlock);                                              \
169 } G_STMT_END
170
171 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
172   GST_QUEUE2_MUTEX_LOCK (q);                                            \
173   if (res != GST_FLOW_OK)                                               \
174     goto label;                                                         \
175 } G_STMT_END
176
177 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
178   g_mutex_unlock (&q->qlock);                                            \
179 } G_STMT_END
180
181 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
182   STATUS (queue, q->sinkpad, "wait for DEL");                           \
183   q->waiting_del = TRUE;                                                \
184   g_cond_wait (&q->item_del, &queue->qlock);                              \
185   q->waiting_del = FALSE;                                               \
186   if (res != GST_FLOW_OK) {                                             \
187     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
188     goto label;                                                         \
189   }                                                                     \
190   STATUS (queue, q->sinkpad, "received DEL");                           \
191 } G_STMT_END
192
193 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
194   STATUS (queue, q->srcpad, "wait for ADD");                            \
195   q->waiting_add = TRUE;                                                \
196   g_cond_wait (&q->item_add, &q->qlock);                                  \
197   q->waiting_add = FALSE;                                               \
198   if (res != GST_FLOW_OK) {                                             \
199     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
200     goto label;                                                         \
201   }                                                                     \
202   STATUS (queue, q->srcpad, "received ADD");                            \
203 } G_STMT_END
204
205 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
206   if (q->waiting_del) {                                                 \
207     STATUS (q, q->srcpad, "signal DEL");                                \
208     g_cond_signal (&q->item_del);                                        \
209   }                                                                     \
210 } G_STMT_END
211
212 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
213   if (q->waiting_add) {                                                 \
214     STATUS (q, q->sinkpad, "signal ADD");                               \
215     g_cond_signal (&q->item_add);                                        \
216   }                                                                     \
217 } G_STMT_END
218
219 #define SET_PERCENT(q, perc) G_STMT_START {                              \
220   if (perc != q->buffering_percent) {                                    \
221     q->buffering_percent = perc;                                         \
222     q->percent_changed = TRUE;                                           \
223     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
224     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
225         &q->buffering_left);                                             \
226   }                                                                      \
227 } G_STMT_END
228
229 #define _do_init \
230     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
231     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
232         "dataflow inside the queue element");
233 #define gst_queue2_parent_class parent_class
234 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
235
236 static void gst_queue2_finalize (GObject * object);
237
238 static void gst_queue2_set_property (GObject * object,
239     guint prop_id, const GValue * value, GParamSpec * pspec);
240 static void gst_queue2_get_property (GObject * object,
241     guint prop_id, GValue * value, GParamSpec * pspec);
242
243 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
244     GstBuffer * buffer);
245 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
246     GstBufferList * buffer_list);
247 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
248 static void gst_queue2_loop (GstPad * pad);
249
250 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
251     GstEvent * event);
252 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
253     GstQuery * query);
254
255 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
256     GstEvent * event);
257 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
258     GstQuery * query);
259 static gboolean gst_queue2_handle_query (GstElement * element,
260     GstQuery * query);
261
262 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
263     guint64 offset, guint length, GstBuffer ** buffer);
264
265 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
266     GstPadMode mode, gboolean active);
267 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
268     GstPadMode mode, gboolean active);
269 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
270     GstStateChange transition);
271
272 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
273 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
274
275 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
276 static void update_in_rates (GstQueue2 * queue);
277 static void gst_queue2_post_buffering (GstQueue2 * queue);
278
279 typedef enum
280 {
281   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
282   GST_QUEUE2_ITEM_TYPE_BUFFER,
283   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
284   GST_QUEUE2_ITEM_TYPE_EVENT,
285   GST_QUEUE2_ITEM_TYPE_QUERY
286 } GstQueue2ItemType;
287
288 typedef struct
289 {
290   GstQueue2ItemType type;
291   GstMiniObject *item;
292 } GstQueue2Item;
293
294 static guint gst_queue2_signals[LAST_SIGNAL] = { 0 };
295
296 static void
297 gst_queue2_class_init (GstQueue2Class * klass)
298 {
299   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
300   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
301
302   gobject_class->set_property = gst_queue2_set_property;
303   gobject_class->get_property = gst_queue2_get_property;
304
305   /* signals */
306   /**
307    * GstQueue2::overrun:
308    * @queue: the queue2 instance
309    *
310    * Reports that the buffer became full (overrun).
311    * A buffer is full if the total amount of data inside it (num-buffers, time,
312    * size) is higher than the boundary values which can be set through the
313    * GObject properties.
314    *
315    * Since: 1.8
316    */
317   gst_queue2_signals[SIGNAL_OVERRUN] =
318       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
319       G_STRUCT_OFFSET (GstQueue2Class, overrun), NULL, NULL,
320       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
321
322   /* properties */
323   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
324       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
325           "Current amount of data in the queue (bytes)",
326           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
327   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
328       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
329           "Current number of buffers in the queue",
330           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
331   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
332       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
333           "Current amount of data in the queue (in ns)",
334           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
335
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
338           "Max. amount of data in the queue (bytes, 0=disable)",
339           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
343       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
344           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
345           DEFAULT_MAX_SIZE_BUFFERS,
346           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
347           G_PARAM_STATIC_STRINGS));
348   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
349       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
350           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
351           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
352           G_PARAM_STATIC_STRINGS));
353
354   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
355       g_param_spec_boolean ("use-buffering", "Use buffering",
356           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
357           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
358           G_PARAM_STATIC_STRINGS));
359   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
360       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
361           "Estimate the bitrate of the stream to calculate time level",
362           DEFAULT_USE_RATE_ESTIMATE,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
365       g_param_spec_int ("low-percent", "Low percent",
366           "Low threshold for buffering to start. Only used if use-buffering is True",
367           0, 100, DEFAULT_LOW_PERCENT,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
370       g_param_spec_int ("high-percent", "High percent",
371           "High threshold for buffering to finish. Only used if use-buffering is True",
372           0, 100, DEFAULT_HIGH_PERCENT,
373           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374
375   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
376       g_param_spec_string ("temp-template", "Temporary File Template",
377           "File template to store temporary files in, should contain directory "
378           "and XXXXXX. (NULL == disabled)",
379           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
382       g_param_spec_string ("temp-location", "Temporary File Location",
383           "Location to store temporary files in (Only read this property, "
384           "use temp-template to configure the name template)",
385           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
386
387   /**
388    * GstQueue2:temp-remove
389    *
390    * When temp-template is set, remove the temporary file when going to READY.
391    */
392   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
393       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
394           "Remove the temp-location after use",
395           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
396
397   /**
398    * GstQueue2:ring-buffer-max-size
399    *
400    * The maximum size of the ring buffer in bytes. If set to 0, the ring
401    * buffer is disabled. Default 0.
402    */
403   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
404       g_param_spec_uint64 ("ring-buffer-max-size",
405           "Max. ring buffer size (bytes)",
406           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
407           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
408           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409
410   /**
411    * GstQueue2:avg-in-rate
412    *
413    * The average input data rate.
414    */
415   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
416       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
417           "Average input data rate (bytes/s)",
418           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
419
420   /* set several parent class virtual functions */
421   gobject_class->finalize = gst_queue2_finalize;
422
423   gst_element_class_add_pad_template (gstelement_class,
424       gst_static_pad_template_get (&srctemplate));
425   gst_element_class_add_pad_template (gstelement_class,
426       gst_static_pad_template_get (&sinktemplate));
427
428   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
429       "Generic",
430       "Simple data queue",
431       "Erik Walthinsen <omega@cse.ogi.edu>, "
432       "Wim Taymans <wim.taymans@gmail.com>");
433
434   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
435   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
436 }
437
438 static void
439 gst_queue2_init (GstQueue2 * queue)
440 {
441   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
442
443   gst_pad_set_chain_function (queue->sinkpad,
444       GST_DEBUG_FUNCPTR (gst_queue2_chain));
445   gst_pad_set_chain_list_function (queue->sinkpad,
446       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
447   gst_pad_set_activatemode_function (queue->sinkpad,
448       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
449   gst_pad_set_event_function (queue->sinkpad,
450       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
451   gst_pad_set_query_function (queue->sinkpad,
452       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
453   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
454   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
455
456   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
457
458   gst_pad_set_activatemode_function (queue->srcpad,
459       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
460   gst_pad_set_getrange_function (queue->srcpad,
461       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
462   gst_pad_set_event_function (queue->srcpad,
463       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
464   gst_pad_set_query_function (queue->srcpad,
465       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
466   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
467   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
468
469   /* levels */
470   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
471   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
472   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
473   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
474   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
475   queue->use_buffering = DEFAULT_USE_BUFFERING;
476   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
477   queue->low_percent = DEFAULT_LOW_PERCENT;
478   queue->high_percent = DEFAULT_HIGH_PERCENT;
479
480   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
481   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
482
483   queue->sinktime = GST_CLOCK_TIME_NONE;
484   queue->srctime = GST_CLOCK_TIME_NONE;
485   queue->sink_tainted = TRUE;
486   queue->src_tainted = TRUE;
487
488   queue->srcresult = GST_FLOW_FLUSHING;
489   queue->sinkresult = GST_FLOW_FLUSHING;
490   queue->is_eos = FALSE;
491   queue->in_timer = g_timer_new ();
492   queue->out_timer = g_timer_new ();
493
494   g_mutex_init (&queue->qlock);
495   queue->waiting_add = FALSE;
496   g_cond_init (&queue->item_add);
497   queue->waiting_del = FALSE;
498   g_cond_init (&queue->item_del);
499   g_queue_init (&queue->queue);
500
501   g_cond_init (&queue->query_handled);
502   queue->last_query = FALSE;
503
504   g_mutex_init (&queue->buffering_post_lock);
505   queue->buffering_percent = 100;
506
507   /* tempfile related */
508   queue->temp_template = NULL;
509   queue->temp_location = NULL;
510   queue->temp_remove = DEFAULT_TEMP_REMOVE;
511
512   queue->ring_buffer = NULL;
513   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
514
515   GST_DEBUG_OBJECT (queue,
516       "initialized queue's not_empty & not_full conditions");
517 }
518
519 /* called only once, as opposed to dispose */
520 static void
521 gst_queue2_finalize (GObject * object)
522 {
523   GstQueue2 *queue = GST_QUEUE2 (object);
524
525   GST_DEBUG_OBJECT (queue, "finalizing queue");
526
527   while (!g_queue_is_empty (&queue->queue)) {
528     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
529
530     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
531       gst_mini_object_unref (qitem->item);
532     g_slice_free (GstQueue2Item, qitem);
533   }
534
535   queue->last_query = FALSE;
536   g_queue_clear (&queue->queue);
537   g_mutex_clear (&queue->qlock);
538   g_mutex_clear (&queue->buffering_post_lock);
539   g_cond_clear (&queue->item_add);
540   g_cond_clear (&queue->item_del);
541   g_cond_clear (&queue->query_handled);
542   g_timer_destroy (queue->in_timer);
543   g_timer_destroy (queue->out_timer);
544
545   /* temp_file path cleanup  */
546   g_free (queue->temp_template);
547   g_free (queue->temp_location);
548
549   G_OBJECT_CLASS (parent_class)->finalize (object);
550 }
551
552 static void
553 debug_ranges (GstQueue2 * queue)
554 {
555   GstQueue2Range *walk;
556
557   for (walk = queue->ranges; walk; walk = walk->next) {
558     GST_DEBUG_OBJECT (queue,
559         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
560         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
561         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
562         walk->rb_writing_pos, walk->reading_pos,
563         walk == queue->current ? "**y**" : "  n  ");
564   }
565 }
566
567 /* clear all the downloaded ranges */
568 static void
569 clean_ranges (GstQueue2 * queue)
570 {
571   GST_DEBUG_OBJECT (queue, "clean queue ranges");
572
573   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
574   queue->ranges = NULL;
575   queue->current = NULL;
576 }
577
578 /* find a range that contains @offset or NULL when nothing does */
579 static GstQueue2Range *
580 find_range (GstQueue2 * queue, guint64 offset)
581 {
582   GstQueue2Range *range = NULL;
583   GstQueue2Range *walk;
584
585   /* first do a quick check for the current range */
586   for (walk = queue->ranges; walk; walk = walk->next) {
587     if (offset >= walk->offset && offset <= walk->writing_pos) {
588       /* we can reuse an existing range */
589       range = walk;
590       break;
591     }
592   }
593   if (range) {
594     GST_DEBUG_OBJECT (queue,
595         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
596         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
597   } else {
598     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
599   }
600   return range;
601 }
602
603 static void
604 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
605 {
606   guint64 max_reading_pos, writing_pos;
607
608   writing_pos = range->writing_pos;
609   max_reading_pos = range->max_reading_pos;
610
611   if (writing_pos > max_reading_pos)
612     queue->cur_level.bytes = writing_pos - max_reading_pos;
613   else
614     queue->cur_level.bytes = 0;
615 }
616
617 /* make a new range for @offset or reuse an existing range */
618 static GstQueue2Range *
619 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
620 {
621   GstQueue2Range *range, *prev, *next;
622
623   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
624
625   if ((range = find_range (queue, offset))) {
626     GST_DEBUG_OBJECT (queue,
627         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
628         range->writing_pos);
629     if (update_existing && range->writing_pos != offset) {
630       GST_DEBUG_OBJECT (queue, "updating range writing position to "
631           "%" G_GUINT64_FORMAT, offset);
632       range->writing_pos = offset;
633     }
634   } else {
635     GST_DEBUG_OBJECT (queue,
636         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
637
638     range = g_slice_new0 (GstQueue2Range);
639     range->offset = offset;
640     /* we want to write to the next location in the ring buffer */
641     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
642     range->writing_pos = offset;
643     range->rb_writing_pos = range->rb_offset;
644     range->reading_pos = offset;
645     range->max_reading_pos = offset;
646
647     /* insert sorted */
648     prev = NULL;
649     next = queue->ranges;
650     while (next) {
651       if (next->offset > offset) {
652         /* insert before next */
653         GST_DEBUG_OBJECT (queue,
654             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
655             next->offset);
656         break;
657       }
658       /* try next */
659       prev = next;
660       next = next->next;
661     }
662     range->next = next;
663     if (prev)
664       prev->next = range;
665     else
666       queue->ranges = range;
667   }
668   debug_ranges (queue);
669
670   /* update the stats for this range */
671   update_cur_level (queue, range);
672
673   return range;
674 }
675
676
677 /* clear and init the download ranges for offset 0 */
678 static void
679 init_ranges (GstQueue2 * queue)
680 {
681   GST_DEBUG_OBJECT (queue, "init queue ranges");
682
683   /* get rid of all the current ranges */
684   clean_ranges (queue);
685   /* make a range for offset 0 */
686   queue->current = add_range (queue, 0, TRUE);
687 }
688
689 /* calculate the diff between running time on the sink and src of the queue.
690  * This is the total amount of time in the queue. */
691 static void
692 update_time_level (GstQueue2 * queue)
693 {
694   if (queue->sink_tainted) {
695     queue->sinktime =
696         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
697         queue->sink_segment.position);
698     queue->sink_tainted = FALSE;
699   }
700
701   if (queue->src_tainted) {
702     queue->srctime =
703         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
704         queue->src_segment.position);
705     queue->src_tainted = FALSE;
706   }
707
708   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
709       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
710
711   if (queue->sinktime != GST_CLOCK_TIME_NONE
712       && queue->srctime != GST_CLOCK_TIME_NONE
713       && queue->sinktime >= queue->srctime)
714     queue->cur_level.time = queue->sinktime - queue->srctime;
715   else
716     queue->cur_level.time = 0;
717 }
718
719 /* take a SEGMENT event and apply the values to segment, updating the time
720  * level of queue. */
721 static void
722 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
723     gboolean is_sink)
724 {
725   gst_event_copy_segment (event, segment);
726
727   if (segment->format == GST_FORMAT_BYTES) {
728     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
729       /* start is where we'll be getting from and as such writing next */
730       queue->current = add_range (queue, segment->start, TRUE);
731     }
732   }
733
734   /* now configure the values, we use these to track timestamps on the
735    * sinkpad. */
736   if (segment->format != GST_FORMAT_TIME) {
737     /* non-time format, pretend the current time segment is closed with a
738      * 0 start and unknown stop time. */
739     segment->format = GST_FORMAT_TIME;
740     segment->start = 0;
741     segment->stop = -1;
742     segment->time = 0;
743   }
744
745   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
746
747   if (is_sink)
748     queue->sink_tainted = TRUE;
749   else
750     queue->src_tainted = TRUE;
751
752   /* segment can update the time level of the queue */
753   update_time_level (queue);
754 }
755
756 static void
757 apply_gap (GstQueue2 * queue, GstEvent * event,
758     GstSegment * segment, gboolean is_sink)
759 {
760   GstClockTime timestamp;
761   GstClockTime duration;
762
763   gst_event_parse_gap (event, &timestamp, &duration);
764
765   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
766
767     if (GST_CLOCK_TIME_IS_VALID (duration)) {
768       timestamp += duration;
769     }
770
771     segment->position = timestamp;
772
773     if (is_sink)
774       queue->sink_tainted = TRUE;
775     else
776       queue->src_tainted = TRUE;
777
778     /* calc diff with other end */
779     update_time_level (queue);
780   }
781 }
782
783 /* take a buffer and update segment, updating the time level of the queue. */
784 static void
785 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
786     gboolean is_sink)
787 {
788   GstClockTime duration, timestamp;
789
790   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
791   duration = GST_BUFFER_DURATION (buffer);
792
793   /* if no timestamp is set, assume it's continuous with the previous
794    * time */
795   if (timestamp == GST_CLOCK_TIME_NONE)
796     timestamp = segment->position;
797
798   /* add duration */
799   if (duration != GST_CLOCK_TIME_NONE)
800     timestamp += duration;
801
802   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
803       GST_TIME_ARGS (timestamp));
804
805   segment->position = timestamp;
806
807   if (is_sink)
808     queue->sink_tainted = TRUE;
809   else
810     queue->src_tainted = TRUE;
811
812   /* calc diff with other end */
813   update_time_level (queue);
814 }
815
816 static gboolean
817 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
818 {
819   GstClockTime *timestamp = data;
820   GstClockTime btime;
821
822   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
823       " duration %" GST_TIME_FORMAT, idx,
824       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
825       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
826       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
827
828   btime = GST_BUFFER_DTS_OR_PTS (*buf);
829   if (GST_CLOCK_TIME_IS_VALID (btime))
830     *timestamp = btime;
831
832   if (GST_BUFFER_DURATION_IS_VALID (*buf))
833     *timestamp += GST_BUFFER_DURATION (*buf);
834
835   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
836   return TRUE;
837 }
838
839 /* take a buffer list and update segment, updating the time level of the queue */
840 static void
841 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
842     GstSegment * segment, gboolean is_sink)
843 {
844   GstClockTime timestamp;
845
846   /* if no timestamp is set, assume it's continuous with the previous time */
847   timestamp = segment->position;
848
849   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
850
851   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
852       GST_TIME_ARGS (timestamp));
853
854   segment->position = timestamp;
855
856   if (is_sink)
857     queue->sink_tainted = TRUE;
858   else
859     queue->src_tainted = TRUE;
860
861   /* calc diff with other end */
862   update_time_level (queue);
863 }
864
865 static inline gint
866 get_percent (guint64 cur_level, guint64 max_level, guint64 alt_max)
867 {
868   guint64 p;
869
870   if (max_level == 0)
871     return 0;
872
873   if (alt_max > 0)
874     p = gst_util_uint64_scale (cur_level, 100, MIN (max_level, alt_max));
875   else
876     p = gst_util_uint64_scale (cur_level, 100, max_level);
877
878   return MIN (p, 100);
879 }
880
881 static gboolean
882 get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
883     gint * percent)
884 {
885   gint perc, perc2;
886
887   if (queue->high_percent <= 0) {
888     if (percent)
889       *percent = 100;
890     if (is_buffering)
891       *is_buffering = FALSE;
892     return FALSE;
893   }
894 #define GET_PERCENT(format,alt_max) \
895     get_percent(queue->cur_level.format,queue->max_level.format,(alt_max))
896
897   if (queue->is_eos) {
898     /* on EOS we are always 100% full, we set the var here so that it we can
899      * reuse the logic below to stop buffering */
900     perc = 100;
901     GST_LOG_OBJECT (queue, "we are EOS");
902   } else {
903     GST_LOG_OBJECT (queue,
904         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
905         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
906         queue->cur_level.buffers);
907
908     /* figure out the percent we are filled, we take the max of all formats. */
909     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
910       perc = GET_PERCENT (bytes, 0);
911     } else {
912       guint64 rb_size = queue->ring_buffer_max_size;
913       perc = GET_PERCENT (bytes, rb_size);
914     }
915
916     perc2 = GET_PERCENT (time, 0);
917     perc = MAX (perc, perc2);
918
919     perc2 = GET_PERCENT (buffers, 0);
920     perc = MAX (perc, perc2);
921
922     /* also apply the rate estimate when we need to */
923     if (queue->use_rate_estimate) {
924       perc2 = GET_PERCENT (rate_time, 0);
925       perc = MAX (perc, perc2);
926     }
927
928     /* Don't get to 0% unless we're really empty */
929     if (queue->cur_level.bytes > 0)
930       perc = MAX (1, perc);
931   }
932 #undef GET_PERCENT
933
934   if (is_buffering)
935     *is_buffering = queue->is_buffering;
936
937   /* scale to high percent so that it becomes the 100% mark */
938   perc = perc * 100 / queue->high_percent;
939   /* clip */
940   if (perc > 100)
941     perc = 100;
942
943   if (percent)
944     *percent = perc;
945
946   GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
947       perc);
948
949   return TRUE;
950 }
951
952 static void
953 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
954     gint * avg_in, gint * avg_out, gint64 * buffering_left)
955 {
956   if (mode) {
957     if (!QUEUE_IS_USING_QUEUE (queue)) {
958       if (QUEUE_IS_USING_RING_BUFFER (queue))
959         *mode = GST_BUFFERING_TIMESHIFT;
960       else
961         *mode = GST_BUFFERING_DOWNLOAD;
962     } else {
963       *mode = GST_BUFFERING_STREAM;
964     }
965   }
966
967   if (avg_in)
968     *avg_in = queue->byte_in_rate;
969   if (avg_out)
970     *avg_out = queue->byte_out_rate;
971
972   if (buffering_left) {
973     *buffering_left = (percent == 100 ? 0 : -1);
974
975     if (queue->use_rate_estimate) {
976       guint64 max, cur;
977
978       max = queue->max_level.rate_time;
979       cur = queue->cur_level.rate_time;
980
981       if (percent != 100 && max > cur)
982         *buffering_left = (max - cur) / 1000000;
983     }
984   }
985 }
986
987 static void
988 gst_queue2_post_buffering (GstQueue2 * queue)
989 {
990   GstMessage *msg = NULL;
991
992   g_mutex_lock (&queue->buffering_post_lock);
993   GST_QUEUE2_MUTEX_LOCK (queue);
994   if (queue->percent_changed) {
995     gint percent = queue->buffering_percent;
996
997     queue->percent_changed = FALSE;
998
999     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1000     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1001
1002     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1003         queue->avg_out, queue->buffering_left);
1004   }
1005   GST_QUEUE2_MUTEX_UNLOCK (queue);
1006
1007   if (msg != NULL)
1008     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1009
1010   g_mutex_unlock (&queue->buffering_post_lock);
1011 }
1012
1013 static void
1014 update_buffering (GstQueue2 * queue)
1015 {
1016   gint percent;
1017
1018   /* Ensure the variables used to calculate buffering state are up-to-date. */
1019   if (queue->current)
1020     update_cur_level (queue, queue->current);
1021   update_in_rates (queue);
1022
1023   if (!get_buffering_percent (queue, NULL, &percent))
1024     return;
1025
1026   if (queue->is_buffering) {
1027     /* if we were buffering see if we reached the high watermark */
1028     if (percent >= 100)
1029       queue->is_buffering = FALSE;
1030
1031     SET_PERCENT (queue, percent);
1032   } else {
1033     /* we were not buffering, check if we need to start buffering if we drop
1034      * below the low threshold */
1035     if (percent < queue->low_percent) {
1036       queue->is_buffering = TRUE;
1037       SET_PERCENT (queue, percent);
1038     }
1039   }
1040 }
1041
1042 static void
1043 reset_rate_timer (GstQueue2 * queue)
1044 {
1045   queue->bytes_in = 0;
1046   queue->bytes_out = 0;
1047   queue->byte_in_rate = 0.0;
1048   queue->byte_in_period = 0;
1049   queue->byte_out_rate = 0.0;
1050   queue->last_update_in_rates_elapsed = 0.0;
1051   queue->last_in_elapsed = 0.0;
1052   queue->last_out_elapsed = 0.0;
1053   queue->in_timer_started = FALSE;
1054   queue->out_timer_started = FALSE;
1055 }
1056
1057 /* the interval in seconds to recalculate the rate */
1058 #define RATE_INTERVAL    0.2
1059 /* Tuning for rate estimation. We use a large window for the input rate because
1060  * it should be stable when connected to a network. The output rate is less
1061  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1062  * therefore adapt more quickly.
1063  * However, initial input rate may be subject to a burst, and should therefore
1064  * initially also adapt more quickly to changes, and only later on give higher
1065  * weight to previous values. */
1066 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1067 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1068
1069 static void
1070 update_in_rates (GstQueue2 * queue)
1071 {
1072   gdouble elapsed, period;
1073   gdouble byte_in_rate;
1074
1075   if (!queue->in_timer_started) {
1076     queue->in_timer_started = TRUE;
1077     g_timer_start (queue->in_timer);
1078     return;
1079   }
1080
1081   queue->last_update_in_rates_elapsed = elapsed =
1082       g_timer_elapsed (queue->in_timer, NULL);
1083
1084   /* recalc after each interval. */
1085   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1086     period = elapsed - queue->last_in_elapsed;
1087
1088     GST_DEBUG_OBJECT (queue,
1089         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1090         period, queue->bytes_in, queue->byte_in_period);
1091
1092     byte_in_rate = queue->bytes_in / period;
1093
1094     if (queue->byte_in_rate == 0.0)
1095       queue->byte_in_rate = byte_in_rate;
1096     else
1097       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1098           (double) queue->byte_in_period, period);
1099
1100     /* another data point, cap at 16 for long time running average */
1101     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1102       queue->byte_in_period += period;
1103
1104     /* reset the values to calculate rate over the next interval */
1105     queue->last_in_elapsed = elapsed;
1106     queue->bytes_in = 0;
1107   }
1108
1109   if (queue->byte_in_rate > 0.0) {
1110     queue->cur_level.rate_time =
1111         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1112   }
1113   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1114       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1115 }
1116
1117 static void
1118 update_out_rates (GstQueue2 * queue)
1119 {
1120   gdouble elapsed, period;
1121   gdouble byte_out_rate;
1122
1123   if (!queue->out_timer_started) {
1124     queue->out_timer_started = TRUE;
1125     g_timer_start (queue->out_timer);
1126     return;
1127   }
1128
1129   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1130
1131   /* recalc after each interval. */
1132   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1133     period = elapsed - queue->last_out_elapsed;
1134
1135     GST_DEBUG_OBJECT (queue,
1136         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1137
1138     byte_out_rate = queue->bytes_out / period;
1139
1140     if (queue->byte_out_rate == 0.0)
1141       queue->byte_out_rate = byte_out_rate;
1142     else
1143       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1144
1145     /* reset the values to calculate rate over the next interval */
1146     queue->last_out_elapsed = elapsed;
1147     queue->bytes_out = 0;
1148   }
1149   if (queue->byte_in_rate > 0.0) {
1150     queue->cur_level.rate_time =
1151         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1152   }
1153   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1154       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1155 }
1156
1157 static void
1158 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1159 {
1160   guint64 reading_pos, max_reading_pos;
1161
1162   reading_pos = pos;
1163   max_reading_pos = range->max_reading_pos;
1164
1165   max_reading_pos = MAX (max_reading_pos, reading_pos);
1166
1167   GST_DEBUG_OBJECT (queue,
1168       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1169       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1170   range->max_reading_pos = max_reading_pos;
1171
1172   update_cur_level (queue, range);
1173 }
1174
1175 static gboolean
1176 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1177 {
1178   GstEvent *event;
1179   gboolean res;
1180
1181   /* until we receive the FLUSH_STOP from this seek, we skip data */
1182   queue->seeking = TRUE;
1183   GST_QUEUE2_MUTEX_UNLOCK (queue);
1184
1185   debug_ranges (queue);
1186
1187   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1188
1189   event =
1190       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1191       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1192       GST_SEEK_TYPE_NONE, -1);
1193
1194   res = gst_pad_push_event (queue->sinkpad, event);
1195   GST_QUEUE2_MUTEX_LOCK (queue);
1196
1197   if (res) {
1198     /* Between us sending the seek event and re-acquiring the lock, the source
1199      * thread might already have pushed data and moved along the range's
1200      * writing_pos beyond the seek offset. In that case we don't want to set
1201      * the writing position back to the requested seek position, as it would
1202      * cause data to be written to the wrong offset in the file or ring buffer.
1203      * We still do the add_range call to switch the current range to the
1204      * requested range, or create one if one doesn't exist yet. */
1205     queue->current = add_range (queue, offset, FALSE);
1206   }
1207
1208   return res;
1209 }
1210
1211 /* get the threshold for when we decide to seek rather than wait */
1212 static guint64
1213 get_seek_threshold (GstQueue2 * queue)
1214 {
1215   guint64 threshold;
1216
1217   /* FIXME, find a good threshold based on the incoming rate. */
1218   threshold = 1024 * 512;
1219
1220   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1221     threshold = MIN (threshold,
1222         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1223   }
1224   return threshold;
1225 }
1226
1227 /* see if there is enough data in the file to read a full buffer */
1228 static gboolean
1229 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1230 {
1231   GstQueue2Range *range;
1232
1233   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1234       offset, length);
1235
1236   if ((range = find_range (queue, offset))) {
1237     if (queue->current != range) {
1238       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1239       perform_seek_to_offset (queue, range->writing_pos);
1240     }
1241
1242     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1243         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1244
1245     /* we have a range for offset */
1246     GST_DEBUG_OBJECT (queue,
1247         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1248         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1249
1250     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1251       return TRUE;
1252
1253     if (offset + length <= range->writing_pos)
1254       return TRUE;
1255     else
1256       GST_DEBUG_OBJECT (queue,
1257           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1258           (offset + length) - range->writing_pos);
1259
1260   } else {
1261     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1262         " len %u", offset, length);
1263     /* we don't have the range, see how far away we are */
1264     if (!queue->is_eos && queue->current) {
1265       guint64 threshold = get_seek_threshold (queue);
1266
1267       if (offset >= queue->current->offset && offset <=
1268           queue->current->writing_pos + threshold) {
1269         GST_INFO_OBJECT (queue,
1270             "requested data is within range, wait for data");
1271         return FALSE;
1272       }
1273     }
1274
1275     /* too far away, do a seek */
1276     perform_seek_to_offset (queue, offset);
1277   }
1278
1279   return FALSE;
1280 }
1281
1282 #ifdef HAVE_FSEEKO
1283 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1284 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1285 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1286 #else
1287 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1288 #endif
1289
1290 static GstFlowReturn
1291 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1292     guint8 * dst, gint64 * read_return)
1293 {
1294   guint8 *ring_buffer;
1295   size_t res;
1296
1297   ring_buffer = queue->ring_buffer;
1298
1299   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1300     goto seek_failed;
1301
1302   /* this should not block */
1303   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1304       length, offset);
1305   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1306     res = fread (dst, 1, length, queue->temp_file);
1307   } else {
1308     memcpy (dst, ring_buffer + offset, length);
1309     res = length;
1310   }
1311
1312   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1313
1314   if (G_UNLIKELY (res < length)) {
1315     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1316       goto could_not_read;
1317     /* check for errors or EOF */
1318     if (ferror (queue->temp_file))
1319       goto could_not_read;
1320     if (feof (queue->temp_file) && length > 0)
1321       goto eos;
1322   }
1323
1324   *read_return = res;
1325
1326   return GST_FLOW_OK;
1327
1328 seek_failed:
1329   {
1330     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1331     return GST_FLOW_ERROR;
1332   }
1333 could_not_read:
1334   {
1335     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1336     return GST_FLOW_ERROR;
1337   }
1338 eos:
1339   {
1340     GST_DEBUG ("non-regular file hits EOS");
1341     return GST_FLOW_EOS;
1342   }
1343 }
1344
1345 static GstFlowReturn
1346 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1347     GstBuffer ** buffer)
1348 {
1349   GstBuffer *buf;
1350   GstMapInfo info;
1351   guint8 *data;
1352   guint64 file_offset;
1353   guint block_length, remaining, read_length;
1354   guint64 rb_size;
1355   guint64 max_size;
1356   guint64 rpos;
1357   GstFlowReturn ret = GST_FLOW_OK;
1358
1359   /* allocate the output buffer of the requested size */
1360   if (*buffer == NULL)
1361     buf = gst_buffer_new_allocate (NULL, length, NULL);
1362   else
1363     buf = *buffer;
1364
1365   gst_buffer_map (buf, &info, GST_MAP_WRITE);
1366   data = info.data;
1367
1368   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1369       offset);
1370
1371   rpos = offset;
1372   rb_size = queue->ring_buffer_max_size;
1373   max_size = QUEUE_MAX_BYTES (queue);
1374
1375   remaining = length;
1376   while (remaining > 0) {
1377     /* configure how much/whether to read */
1378     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1379       read_length = 0;
1380
1381       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1382         guint64 level;
1383
1384         /* calculate how far away the offset is */
1385         if (queue->current->writing_pos > rpos)
1386           level = queue->current->writing_pos - rpos;
1387         else
1388           level = 0;
1389
1390         GST_DEBUG_OBJECT (queue,
1391             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1392             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1393             rpos, queue->current->writing_pos, level, max_size);
1394
1395         if (level >= max_size) {
1396           /* we don't have the data but if we have a ring buffer that is full, we
1397            * need to read */
1398           GST_DEBUG_OBJECT (queue,
1399               "ring buffer full, reading QUEUE_MAX_BYTES %"
1400               G_GUINT64_FORMAT " bytes", max_size);
1401           read_length = max_size;
1402         } else if (queue->is_eos) {
1403           /* won't get any more data so read any data we have */
1404           if (level) {
1405             GST_DEBUG_OBJECT (queue,
1406                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1407                 level);
1408             read_length = level;
1409             remaining = level;
1410             length = level;
1411           } else
1412             goto hit_eos;
1413         }
1414       }
1415
1416       if (read_length == 0) {
1417         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1418           GST_DEBUG_OBJECT (queue,
1419               "update current position [%" G_GUINT64_FORMAT "-%"
1420               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1421           update_cur_pos (queue, queue->current, rpos);
1422           GST_QUEUE2_SIGNAL_DEL (queue);
1423         }
1424
1425         if (queue->use_buffering)
1426           update_buffering (queue);
1427
1428         GST_DEBUG_OBJECT (queue, "waiting for add");
1429         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1430         continue;
1431       }
1432     } else {
1433       /* we have the requested data so read it */
1434       read_length = remaining;
1435     }
1436
1437     /* set range reading_pos to actual reading position for this read */
1438     queue->current->reading_pos = rpos;
1439
1440     /* configure how much and from where to read */
1441     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1442       file_offset =
1443           (queue->current->rb_offset + (rpos -
1444               queue->current->offset)) % rb_size;
1445       if (file_offset + read_length > rb_size) {
1446         block_length = rb_size - file_offset;
1447       } else {
1448         block_length = read_length;
1449       }
1450     } else {
1451       file_offset = rpos;
1452       block_length = read_length;
1453     }
1454
1455     /* while we still have data to read, we loop */
1456     while (read_length > 0) {
1457       gint64 read_return;
1458
1459       ret =
1460           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1461           data, &read_return);
1462       if (ret != GST_FLOW_OK)
1463         goto read_error;
1464
1465       file_offset += read_return;
1466       if (QUEUE_IS_USING_RING_BUFFER (queue))
1467         file_offset %= rb_size;
1468
1469       data += read_return;
1470       read_length -= read_return;
1471       block_length = read_length;
1472       remaining -= read_return;
1473
1474       rpos = (queue->current->reading_pos += read_return);
1475       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1476     }
1477     GST_QUEUE2_SIGNAL_DEL (queue);
1478     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1479   }
1480
1481   gst_buffer_unmap (buf, &info);
1482   gst_buffer_resize (buf, 0, length);
1483
1484   GST_BUFFER_OFFSET (buf) = offset;
1485   GST_BUFFER_OFFSET_END (buf) = offset + length;
1486
1487   *buffer = buf;
1488
1489   return ret;
1490
1491   /* ERRORS */
1492 hit_eos:
1493   {
1494     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1495     gst_buffer_unmap (buf, &info);
1496     if (*buffer == NULL)
1497       gst_buffer_unref (buf);
1498     return GST_FLOW_EOS;
1499   }
1500 out_flushing:
1501   {
1502     GST_DEBUG_OBJECT (queue, "we are flushing");
1503     gst_buffer_unmap (buf, &info);
1504     if (*buffer == NULL)
1505       gst_buffer_unref (buf);
1506     return GST_FLOW_FLUSHING;
1507   }
1508 read_error:
1509   {
1510     GST_DEBUG_OBJECT (queue, "we have a read error");
1511     gst_buffer_unmap (buf, &info);
1512     if (*buffer == NULL)
1513       gst_buffer_unref (buf);
1514     return ret;
1515   }
1516 }
1517
1518 /* should be called with QUEUE_LOCK */
1519 static GstMiniObject *
1520 gst_queue2_read_item_from_file (GstQueue2 * queue)
1521 {
1522   GstMiniObject *item;
1523
1524   if (queue->stream_start_event != NULL) {
1525     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1526     queue->stream_start_event = NULL;
1527   } else if (queue->starting_segment != NULL) {
1528     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1529     queue->starting_segment = NULL;
1530   } else {
1531     GstFlowReturn ret;
1532     GstBuffer *buffer = NULL;
1533     guint64 reading_pos;
1534
1535     reading_pos = queue->current->reading_pos;
1536
1537     ret =
1538         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1539         &buffer);
1540
1541     switch (ret) {
1542       case GST_FLOW_OK:
1543         item = GST_MINI_OBJECT_CAST (buffer);
1544         break;
1545       case GST_FLOW_EOS:
1546         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1547         break;
1548       default:
1549         item = NULL;
1550         break;
1551     }
1552   }
1553   return item;
1554 }
1555
1556 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1557  * the temp filename. */
1558 static gboolean
1559 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1560 {
1561   gint fd = -1;
1562   gchar *name = NULL;
1563
1564   if (queue->temp_file)
1565     goto already_opened;
1566
1567   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1568
1569   /* If temp_template was set, allocate a filename and open that file */
1570
1571   /* nothing to do */
1572   if (queue->temp_template == NULL)
1573     goto no_directory;
1574
1575   /* make copy of the template, we don't want to change this */
1576   name = g_strdup (queue->temp_template);
1577
1578 #ifdef __BIONIC__
1579   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1580 #else
1581   fd = g_mkstemp (name);
1582 #endif
1583
1584   if (fd == -1)
1585     goto mkstemp_failed;
1586
1587   /* open the file for update/writing */
1588   queue->temp_file = fdopen (fd, "wb+");
1589   /* error creating file */
1590   if (queue->temp_file == NULL)
1591     goto open_failed;
1592
1593   g_free (queue->temp_location);
1594   queue->temp_location = name;
1595
1596   GST_QUEUE2_MUTEX_UNLOCK (queue);
1597
1598   /* we can't emit the notify with the lock */
1599   g_object_notify (G_OBJECT (queue), "temp-location");
1600
1601   GST_QUEUE2_MUTEX_LOCK (queue);
1602
1603   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1604
1605   return TRUE;
1606
1607   /* ERRORS */
1608 already_opened:
1609   {
1610     GST_DEBUG_OBJECT (queue, "temp file was already open");
1611     return TRUE;
1612   }
1613 no_directory:
1614   {
1615     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1616         (_("No Temp directory specified.")), (NULL));
1617     return FALSE;
1618   }
1619 mkstemp_failed:
1620   {
1621     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1622         (_("Could not create temp file \"%s\"."), queue->temp_template),
1623         GST_ERROR_SYSTEM);
1624     g_free (name);
1625     return FALSE;
1626   }
1627 open_failed:
1628   {
1629     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1630         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1631     g_free (name);
1632     if (fd != -1)
1633       close (fd);
1634     return FALSE;
1635   }
1636 }
1637
1638 static void
1639 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1640 {
1641   /* nothing to do */
1642   if (queue->temp_file == NULL)
1643     return;
1644
1645   GST_DEBUG_OBJECT (queue, "closing temp file");
1646
1647   fflush (queue->temp_file);
1648   fclose (queue->temp_file);
1649
1650   if (queue->temp_remove) {
1651     if (remove (queue->temp_location) < 0) {
1652       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1653           queue->temp_location, g_strerror (errno));
1654     }
1655   }
1656
1657   queue->temp_file = NULL;
1658   clean_ranges (queue);
1659 }
1660
1661 static void
1662 gst_queue2_flush_temp_file (GstQueue2 * queue)
1663 {
1664   if (queue->temp_file == NULL)
1665     return;
1666
1667   GST_DEBUG_OBJECT (queue, "flushing temp file");
1668
1669   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1670 }
1671
1672 static void
1673 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1674 {
1675   if (!QUEUE_IS_USING_QUEUE (queue)) {
1676     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1677       gst_queue2_flush_temp_file (queue);
1678     init_ranges (queue);
1679   } else {
1680     while (!g_queue_is_empty (&queue->queue)) {
1681       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1682
1683       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1684           && GST_EVENT_IS_STICKY (qitem->item)
1685           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1686           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1687         gst_pad_store_sticky_event (queue->srcpad,
1688             GST_EVENT_CAST (qitem->item));
1689       }
1690
1691       /* Then lose another reference because we are supposed to destroy that
1692          data when flushing */
1693       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1694         gst_mini_object_unref (qitem->item);
1695       g_slice_free (GstQueue2Item, qitem);
1696     }
1697   }
1698   queue->last_query = FALSE;
1699   g_cond_signal (&queue->query_handled);
1700   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1701   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1702   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1703   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1704   queue->sink_tainted = queue->src_tainted = TRUE;
1705   if (queue->starting_segment != NULL)
1706     gst_event_unref (queue->starting_segment);
1707   queue->starting_segment = NULL;
1708   queue->segment_event_received = FALSE;
1709   gst_event_replace (&queue->stream_start_event, NULL);
1710
1711   /* we deleted a lot of something */
1712   GST_QUEUE2_SIGNAL_DEL (queue);
1713 }
1714
1715 static gboolean
1716 gst_queue2_wait_free_space (GstQueue2 * queue)
1717 {
1718   /* We make space available if we're "full" according to whatever
1719    * the user defined as "full". */
1720   if (gst_queue2_is_filled (queue)) {
1721     gboolean started;
1722
1723     /* pause the timer while we wait. The fact that we are waiting does not mean
1724      * the byterate on the input pad is lower */
1725     if ((started = queue->in_timer_started))
1726       g_timer_stop (queue->in_timer);
1727
1728     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1729         "queue is full, waiting for free space");
1730     do {
1731       GST_QUEUE2_MUTEX_UNLOCK (queue);
1732       g_signal_emit (queue, gst_queue2_signals[SIGNAL_OVERRUN], 0);
1733       GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
1734       /* we recheck, the signal could have changed the thresholds */
1735       if (!gst_queue2_is_filled (queue))
1736         break;
1737
1738       /* Wait for space to be available, we could be unlocked because of a flush. */
1739       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1740     }
1741     while (gst_queue2_is_filled (queue));
1742
1743     /* and continue if we were running before */
1744     if (started)
1745       g_timer_continue (queue->in_timer);
1746   }
1747   return TRUE;
1748
1749   /* ERRORS */
1750 out_flushing:
1751   {
1752     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1753     return FALSE;
1754   }
1755 }
1756
1757 static gboolean
1758 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1759 {
1760   GstMapInfo info;
1761   guint8 *data, *ring_buffer;
1762   guint size, rb_size;
1763   guint64 writing_pos, new_writing_pos;
1764   GstQueue2Range *range, *prev, *next;
1765   gboolean do_seek = FALSE;
1766
1767   if (QUEUE_IS_USING_RING_BUFFER (queue))
1768     writing_pos = queue->current->rb_writing_pos;
1769   else
1770     writing_pos = queue->current->writing_pos;
1771   ring_buffer = queue->ring_buffer;
1772   rb_size = queue->ring_buffer_max_size;
1773
1774   gst_buffer_map (buffer, &info, GST_MAP_READ);
1775
1776   size = info.size;
1777   data = info.data;
1778
1779   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1780       writing_pos);
1781
1782   /* sanity check */
1783   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1784       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1785     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1786         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1787         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1788   }
1789
1790   while (size > 0) {
1791     guint to_write;
1792
1793     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1794       gint64 space;
1795
1796       /* calculate the space in the ring buffer not used by data from
1797        * the current range */
1798       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1799         /* wait until there is some free space */
1800         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1801       }
1802       /* get the amount of space we have */
1803       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1804
1805       /* calculate if we need to split or if we can write the entire
1806        * buffer now */
1807       to_write = MIN (size, space);
1808
1809       /* the writing position in the ring buffer after writing (part
1810        * or all of) the buffer */
1811       new_writing_pos = (writing_pos + to_write) % rb_size;
1812
1813       prev = NULL;
1814       range = queue->ranges;
1815
1816       /* if we need to overwrite data in the ring buffer, we need to
1817        * update the ranges
1818        *
1819        * warning: this code is complicated and includes some
1820        * simplifications - pen, paper and diagrams for the cases
1821        * recommended! */
1822       while (range) {
1823         guint64 range_data_start, range_data_end;
1824         GstQueue2Range *range_to_destroy = NULL;
1825
1826         range_data_start = range->rb_offset;
1827         range_data_end = range->rb_writing_pos;
1828
1829         /* handle the special case where the range has no data in it */
1830         if (range->writing_pos == range->offset) {
1831           if (range != queue->current) {
1832             GST_DEBUG_OBJECT (queue,
1833                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1834                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1835             /* remove range */
1836             range_to_destroy = range;
1837             if (prev)
1838               prev->next = range->next;
1839           }
1840           goto next_range;
1841         }
1842
1843         if (range_data_end > range_data_start) {
1844           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1845             goto next_range;
1846
1847           if (new_writing_pos > range_data_start) {
1848             if (new_writing_pos >= range_data_end) {
1849               GST_DEBUG_OBJECT (queue,
1850                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1851                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1852               /* remove range */
1853               range_to_destroy = range;
1854               if (prev)
1855                 prev->next = range->next;
1856             } else {
1857               GST_DEBUG_OBJECT (queue,
1858                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1859                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1860                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1861                   range->offset + new_writing_pos - range_data_start,
1862                   new_writing_pos);
1863               range->offset += (new_writing_pos - range_data_start);
1864               range->rb_offset = new_writing_pos;
1865             }
1866           }
1867         } else {
1868           guint64 new_wpos_virt = writing_pos + to_write;
1869
1870           if (new_wpos_virt <= range_data_start)
1871             goto next_range;
1872
1873           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1874             GST_DEBUG_OBJECT (queue,
1875                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1876                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1877             /* remove range */
1878             range_to_destroy = range;
1879             if (prev)
1880               prev->next = range->next;
1881           } else {
1882             GST_DEBUG_OBJECT (queue,
1883                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1884                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1885                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1886                 range->offset + new_writing_pos - range_data_start,
1887                 new_writing_pos);
1888             range->offset += (new_wpos_virt - range_data_start);
1889             range->rb_offset = new_writing_pos;
1890           }
1891         }
1892
1893       next_range:
1894         if (!range_to_destroy)
1895           prev = range;
1896
1897         range = range->next;
1898         if (range_to_destroy) {
1899           if (range_to_destroy == queue->ranges)
1900             queue->ranges = range;
1901           g_slice_free (GstQueue2Range, range_to_destroy);
1902           range_to_destroy = NULL;
1903         }
1904       }
1905     } else {
1906       to_write = size;
1907       new_writing_pos = writing_pos + to_write;
1908     }
1909
1910     if (QUEUE_IS_USING_TEMP_FILE (queue)
1911         && FSEEK_FILE (queue->temp_file, writing_pos))
1912       goto seek_failed;
1913
1914     if (new_writing_pos > writing_pos) {
1915       GST_INFO_OBJECT (queue,
1916           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1917           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1918           queue->current->writing_pos, queue->current->rb_writing_pos);
1919       /* either not using ring buffer or no wrapping, just write */
1920       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1921         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1922           goto handle_error;
1923       } else {
1924         memcpy (ring_buffer + writing_pos, data, to_write);
1925       }
1926
1927       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1928         /* try to merge with next range */
1929         while ((next = queue->current->next)) {
1930           GST_INFO_OBJECT (queue,
1931               "checking merge with next range %" G_GUINT64_FORMAT " < %"
1932               G_GUINT64_FORMAT, new_writing_pos, next->offset);
1933           if (new_writing_pos < next->offset)
1934             break;
1935
1936           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1937               next->writing_pos);
1938
1939           /* remove the group */
1940           queue->current->next = next->next;
1941
1942           /* We use the threshold to decide if we want to do a seek or simply
1943            * read the data again. If there is not so much data in the range we
1944            * prefer to avoid to seek and read it again. */
1945           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
1946             /* the new range had more data than the threshold, it's worth keeping
1947              * it and doing a seek. */
1948             new_writing_pos = next->writing_pos;
1949             do_seek = TRUE;
1950           }
1951           g_slice_free (GstQueue2Range, next);
1952         }
1953         goto update_and_signal;
1954       }
1955     } else {
1956       /* wrapping */
1957       guint block_one, block_two;
1958
1959       block_one = rb_size - writing_pos;
1960       block_two = to_write - block_one;
1961
1962       if (block_one > 0) {
1963         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
1964         /* write data to end of ring buffer */
1965         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1966           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1967             goto handle_error;
1968         } else {
1969           memcpy (ring_buffer + writing_pos, data, block_one);
1970         }
1971       }
1972
1973       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
1974         goto seek_failed;
1975
1976       if (block_two > 0) {
1977         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
1978         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1979           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
1980             goto handle_error;
1981         } else {
1982           memcpy (ring_buffer, data + block_one, block_two);
1983         }
1984       }
1985     }
1986
1987   update_and_signal:
1988     /* update the writing positions */
1989     size -= to_write;
1990     GST_INFO_OBJECT (queue,
1991         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
1992         to_write, writing_pos, size);
1993
1994     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1995       data += to_write;
1996       queue->current->writing_pos += to_write;
1997       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1998     } else {
1999       queue->current->writing_pos = writing_pos = new_writing_pos;
2000     }
2001     if (do_seek)
2002       perform_seek_to_offset (queue, new_writing_pos);
2003
2004     update_cur_level (queue, queue->current);
2005
2006     /* update the buffering status */
2007     if (queue->use_buffering)
2008       update_buffering (queue);
2009
2010     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2011         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2012
2013     GST_QUEUE2_SIGNAL_ADD (queue);
2014   }
2015
2016   gst_buffer_unmap (buffer, &info);
2017
2018   return TRUE;
2019
2020   /* ERRORS */
2021 out_flushing:
2022   {
2023     GST_DEBUG_OBJECT (queue, "we are flushing");
2024     gst_buffer_unmap (buffer, &info);
2025     /* FIXME - GST_FLOW_EOS ? */
2026     return FALSE;
2027   }
2028 seek_failed:
2029   {
2030     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2031     gst_buffer_unmap (buffer, &info);
2032     return FALSE;
2033   }
2034 handle_error:
2035   {
2036     switch (errno) {
2037       case ENOSPC:{
2038         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2039         break;
2040       }
2041       default:{
2042         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2043             (_("Error while writing to download file.")),
2044             ("%s", g_strerror (errno)));
2045       }
2046     }
2047     gst_buffer_unmap (buffer, &info);
2048     return FALSE;
2049   }
2050 }
2051
2052 static gboolean
2053 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2054 {
2055   GstQueue2 *queue = q;
2056
2057   GST_TRACE_OBJECT (queue,
2058       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2059       gst_buffer_get_size (*buf));
2060
2061   if (!gst_queue2_create_write (queue, *buf)) {
2062     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2063     return FALSE;
2064   }
2065   return TRUE;
2066 }
2067
2068 static gboolean
2069 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2070 {
2071   guint *p_size = data;
2072   gsize buf_size;
2073
2074   buf_size = gst_buffer_get_size (*buf);
2075   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2076   *p_size += buf_size;
2077   return TRUE;
2078 }
2079
2080 /* enqueue an item an update the level stats */
2081 static void
2082 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2083     GstQueue2ItemType item_type)
2084 {
2085   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2086     GstBuffer *buffer;
2087     guint size;
2088
2089     buffer = GST_BUFFER_CAST (item);
2090     size = gst_buffer_get_size (buffer);
2091
2092     /* add buffer to the statistics */
2093     if (QUEUE_IS_USING_QUEUE (queue)) {
2094       queue->cur_level.buffers++;
2095       queue->cur_level.bytes += size;
2096     }
2097     queue->bytes_in += size;
2098
2099     /* apply new buffer to segment stats */
2100     apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
2101     /* update the byterate stats */
2102     update_in_rates (queue);
2103
2104     if (!QUEUE_IS_USING_QUEUE (queue)) {
2105       /* FIXME - check return value? */
2106       gst_queue2_create_write (queue, buffer);
2107     }
2108   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2109     GstBufferList *buffer_list;
2110     guint size = 0;
2111
2112     buffer_list = GST_BUFFER_LIST_CAST (item);
2113
2114     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2115     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2116
2117     /* add buffer to the statistics */
2118     if (QUEUE_IS_USING_QUEUE (queue)) {
2119       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2120       queue->cur_level.bytes += size;
2121     }
2122     queue->bytes_in += size;
2123
2124     /* apply new buffer to segment stats */
2125     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2126
2127     /* update the byterate stats */
2128     update_in_rates (queue);
2129
2130     if (!QUEUE_IS_USING_QUEUE (queue)) {
2131       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2132     }
2133   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2134     GstEvent *event;
2135
2136     event = GST_EVENT_CAST (item);
2137
2138     switch (GST_EVENT_TYPE (event)) {
2139       case GST_EVENT_EOS:
2140         /* Zero the thresholds, this makes sure the queue is completely
2141          * filled and we can read all data from the queue. */
2142         GST_DEBUG_OBJECT (queue, "we have EOS");
2143         queue->is_eos = TRUE;
2144         break;
2145       case GST_EVENT_SEGMENT:
2146         apply_segment (queue, event, &queue->sink_segment, TRUE);
2147         /* This is our first new segment, we hold it
2148          * as we can't save it on the temp file */
2149         if (!QUEUE_IS_USING_QUEUE (queue)) {
2150           if (queue->segment_event_received)
2151             goto unexpected_event;
2152
2153           queue->segment_event_received = TRUE;
2154           if (queue->starting_segment != NULL)
2155             gst_event_unref (queue->starting_segment);
2156           queue->starting_segment = event;
2157           item = NULL;
2158         }
2159         /* a new segment allows us to accept more buffers if we got EOS
2160          * from downstream */
2161         queue->unexpected = FALSE;
2162         break;
2163       case GST_EVENT_GAP:
2164         apply_gap (queue, event, &queue->sink_segment, TRUE);
2165         break;
2166       case GST_EVENT_STREAM_START:
2167         if (!QUEUE_IS_USING_QUEUE (queue)) {
2168           gst_event_replace (&queue->stream_start_event, event);
2169           gst_event_unref (event);
2170           item = NULL;
2171         }
2172         break;
2173       case GST_EVENT_CAPS:{
2174         GstCaps *caps;
2175
2176         gst_event_parse_caps (event, &caps);
2177         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2178
2179         if (!QUEUE_IS_USING_QUEUE (queue)) {
2180           GST_LOG ("Dropping caps event, not using queue");
2181           gst_event_unref (event);
2182           item = NULL;
2183         }
2184         break;
2185       }
2186       default:
2187         if (!QUEUE_IS_USING_QUEUE (queue))
2188           goto unexpected_event;
2189         break;
2190     }
2191   } else if (GST_IS_QUERY (item)) {
2192     /* Can't happen as we check that in the caller */
2193     if (!QUEUE_IS_USING_QUEUE (queue))
2194       g_assert_not_reached ();
2195   } else {
2196     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2197         item, GST_OBJECT_NAME (queue));
2198     /* we can't really unref since we don't know what it is */
2199     item = NULL;
2200   }
2201
2202   if (item) {
2203     /* update the buffering status */
2204     if (queue->use_buffering)
2205       update_buffering (queue);
2206
2207     if (QUEUE_IS_USING_QUEUE (queue)) {
2208       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2209       qitem->type = item_type;
2210       qitem->item = item;
2211       g_queue_push_tail (&queue->queue, qitem);
2212     } else {
2213       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2214     }
2215
2216     GST_QUEUE2_SIGNAL_ADD (queue);
2217   }
2218
2219   return;
2220
2221   /* ERRORS */
2222 unexpected_event:
2223   {
2224     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2225
2226     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2227         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2228         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2229     gst_event_unref (GST_EVENT_CAST (item));
2230     return;
2231   }
2232 }
2233
2234 /* dequeue an item from the queue and update level stats */
2235 static GstMiniObject *
2236 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2237 {
2238   GstMiniObject *item;
2239
2240   if (!QUEUE_IS_USING_QUEUE (queue)) {
2241     item = gst_queue2_read_item_from_file (queue);
2242   } else {
2243     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2244
2245     if (qitem == NULL)
2246       goto no_item;
2247
2248     item = qitem->item;
2249     g_slice_free (GstQueue2Item, qitem);
2250   }
2251
2252   if (item == NULL)
2253     goto no_item;
2254
2255   if (GST_IS_BUFFER (item)) {
2256     GstBuffer *buffer;
2257     guint size;
2258
2259     buffer = GST_BUFFER_CAST (item);
2260     size = gst_buffer_get_size (buffer);
2261     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2262
2263     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2264         "retrieved buffer %p from queue", buffer);
2265
2266     if (QUEUE_IS_USING_QUEUE (queue)) {
2267       queue->cur_level.buffers--;
2268       queue->cur_level.bytes -= size;
2269     }
2270     queue->bytes_out += size;
2271
2272     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
2273     /* update the byterate stats */
2274     update_out_rates (queue);
2275     /* update the buffering */
2276     if (queue->use_buffering)
2277       update_buffering (queue);
2278
2279   } else if (GST_IS_EVENT (item)) {
2280     GstEvent *event = GST_EVENT_CAST (item);
2281
2282     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2283
2284     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2285         "retrieved event %p from queue", event);
2286
2287     switch (GST_EVENT_TYPE (event)) {
2288       case GST_EVENT_EOS:
2289         /* queue is empty now that we dequeued the EOS */
2290         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2291         break;
2292       case GST_EVENT_SEGMENT:
2293         apply_segment (queue, event, &queue->src_segment, FALSE);
2294         break;
2295       case GST_EVENT_GAP:
2296         apply_gap (queue, event, &queue->src_segment, FALSE);
2297         break;
2298       default:
2299         break;
2300     }
2301   } else if (GST_IS_BUFFER_LIST (item)) {
2302     GstBufferList *buffer_list;
2303     guint size = 0;
2304
2305     buffer_list = GST_BUFFER_LIST_CAST (item);
2306     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2307     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2308
2309     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2310         "retrieved buffer list %p from queue", buffer_list);
2311
2312     if (QUEUE_IS_USING_QUEUE (queue)) {
2313       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2314       queue->cur_level.bytes -= size;
2315     }
2316     queue->bytes_out += size;
2317
2318     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2319     /* update the byterate stats */
2320     update_out_rates (queue);
2321     /* update the buffering */
2322     if (queue->use_buffering)
2323       update_buffering (queue);
2324   } else if (GST_IS_QUERY (item)) {
2325     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2326         "retrieved query %p from queue", item);
2327     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2328   } else {
2329     g_warning
2330         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2331         item, GST_OBJECT_NAME (queue));
2332     item = NULL;
2333     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2334   }
2335   GST_QUEUE2_SIGNAL_DEL (queue);
2336
2337   return item;
2338
2339   /* ERRORS */
2340 no_item:
2341   {
2342     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2343     return NULL;
2344   }
2345 }
2346
2347 static gboolean
2348 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2349     GstEvent * event)
2350 {
2351   gboolean ret = TRUE;
2352   GstQueue2 *queue;
2353
2354   queue = GST_QUEUE2 (parent);
2355
2356   switch (GST_EVENT_TYPE (event)) {
2357     case GST_EVENT_FLUSH_START:
2358     {
2359       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2360       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2361         /* forward event */
2362         ret = gst_pad_push_event (queue->srcpad, event);
2363
2364         /* now unblock the chain function */
2365         GST_QUEUE2_MUTEX_LOCK (queue);
2366         queue->srcresult = GST_FLOW_FLUSHING;
2367         queue->sinkresult = GST_FLOW_FLUSHING;
2368         /* unblock the loop and chain functions */
2369         GST_QUEUE2_SIGNAL_ADD (queue);
2370         GST_QUEUE2_SIGNAL_DEL (queue);
2371         queue->last_query = FALSE;
2372         g_cond_signal (&queue->query_handled);
2373         GST_QUEUE2_MUTEX_UNLOCK (queue);
2374
2375         /* make sure it pauses, this should happen since we sent
2376          * flush_start downstream. */
2377         gst_pad_pause_task (queue->srcpad);
2378         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2379       } else {
2380         GST_QUEUE2_MUTEX_LOCK (queue);
2381         /* flush the sink pad */
2382         queue->sinkresult = GST_FLOW_FLUSHING;
2383         GST_QUEUE2_SIGNAL_DEL (queue);
2384         queue->last_query = FALSE;
2385         g_cond_signal (&queue->query_handled);
2386         GST_QUEUE2_MUTEX_UNLOCK (queue);
2387
2388         gst_event_unref (event);
2389       }
2390       break;
2391     }
2392     case GST_EVENT_FLUSH_STOP:
2393     {
2394       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2395
2396       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2397         /* forward event */
2398         ret = gst_pad_push_event (queue->srcpad, event);
2399
2400         GST_QUEUE2_MUTEX_LOCK (queue);
2401         gst_queue2_locked_flush (queue, FALSE, TRUE);
2402         queue->srcresult = GST_FLOW_OK;
2403         queue->sinkresult = GST_FLOW_OK;
2404         queue->is_eos = FALSE;
2405         queue->unexpected = FALSE;
2406         queue->seeking = FALSE;
2407         /* reset rate counters */
2408         reset_rate_timer (queue);
2409         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2410             queue->srcpad, NULL);
2411         GST_QUEUE2_MUTEX_UNLOCK (queue);
2412       } else {
2413         GST_QUEUE2_MUTEX_LOCK (queue);
2414         queue->segment_event_received = FALSE;
2415         queue->is_eos = FALSE;
2416         queue->unexpected = FALSE;
2417         queue->sinkresult = GST_FLOW_OK;
2418         queue->seeking = FALSE;
2419         GST_QUEUE2_MUTEX_UNLOCK (queue);
2420
2421         gst_event_unref (event);
2422       }
2423       break;
2424     }
2425     default:
2426       if (GST_EVENT_IS_SERIALIZED (event)) {
2427         /* serialized events go in the queue */
2428         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2429         if (queue->srcresult != GST_FLOW_OK) {
2430           /* Errors in sticky event pushing are no problem and ignored here
2431            * as they will cause more meaningful errors during data flow.
2432            * For EOS events, that are not followed by data flow, we still
2433            * return FALSE here though and report an error.
2434            */
2435           if (!GST_EVENT_IS_STICKY (event)) {
2436             goto out_flow_error;
2437           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2438             if (queue->srcresult == GST_FLOW_NOT_LINKED
2439                 || queue->srcresult < GST_FLOW_EOS) {
2440               GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2441                   (_("Internal data flow error.")),
2442                   ("streaming task paused, reason %s (%d)",
2443                       gst_flow_get_name (queue->srcresult), queue->srcresult));
2444             }
2445             goto out_flow_error;
2446           }
2447         }
2448         /* refuse more events on EOS */
2449         if (queue->is_eos)
2450           goto out_eos;
2451         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2452         GST_QUEUE2_MUTEX_UNLOCK (queue);
2453         gst_queue2_post_buffering (queue);
2454       } else {
2455         /* non-serialized events are passed upstream. */
2456         ret = gst_pad_push_event (queue->srcpad, event);
2457       }
2458       break;
2459   }
2460   return ret;
2461
2462   /* ERRORS */
2463 out_flushing:
2464   {
2465     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2466     GST_QUEUE2_MUTEX_UNLOCK (queue);
2467     gst_event_unref (event);
2468     return FALSE;
2469   }
2470 out_eos:
2471   {
2472     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2473     GST_QUEUE2_MUTEX_UNLOCK (queue);
2474     gst_event_unref (event);
2475     return FALSE;
2476   }
2477 out_flow_error:
2478   {
2479     GST_LOG_OBJECT (queue,
2480         "refusing event, we have a downstream flow error: %s",
2481         gst_flow_get_name (queue->srcresult));
2482     GST_QUEUE2_MUTEX_UNLOCK (queue);
2483     gst_event_unref (event);
2484     return FALSE;
2485   }
2486 }
2487
2488 static gboolean
2489 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2490     GstQuery * query)
2491 {
2492   GstQueue2 *queue;
2493   gboolean res;
2494
2495   queue = GST_QUEUE2 (parent);
2496
2497   switch (GST_QUERY_TYPE (query)) {
2498     default:
2499       if (GST_QUERY_IS_SERIALIZED (query)) {
2500         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2501         /* serialized events go in the queue. We need to be certain that we
2502          * don't cause deadlocks waiting for the query return value. We check if
2503          * the queue is empty (nothing is blocking downstream and the query can
2504          * be pushed for sure) or we are not buffering. If we are buffering,
2505          * the pipeline waits to unblock downstream until our queue fills up
2506          * completely, which can not happen if we block on the query..
2507          * Therefore we only potentially block when we are not buffering. */
2508         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2509         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2510                 || !queue->use_buffering)) {
2511           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2512             gst_queue2_locked_enqueue (queue, query,
2513                 GST_QUEUE2_ITEM_TYPE_QUERY);
2514
2515             STATUS (queue, queue->sinkpad, "wait for QUERY");
2516             g_cond_wait (&queue->query_handled, &queue->qlock);
2517             if (queue->sinkresult != GST_FLOW_OK)
2518               goto out_flushing;
2519             res = queue->last_query;
2520           } else {
2521             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2522             res = FALSE;
2523           }
2524         } else {
2525           GST_DEBUG_OBJECT (queue,
2526               "refusing query, we are not using the queue");
2527           res = FALSE;
2528         }
2529         GST_QUEUE2_MUTEX_UNLOCK (queue);
2530         gst_queue2_post_buffering (queue);
2531       } else {
2532         res = gst_pad_query_default (pad, parent, query);
2533       }
2534       break;
2535   }
2536   return res;
2537
2538   /* ERRORS */
2539 out_flushing:
2540   {
2541     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2542     GST_QUEUE2_MUTEX_UNLOCK (queue);
2543     return FALSE;
2544   }
2545 }
2546
2547 static gboolean
2548 gst_queue2_is_empty (GstQueue2 * queue)
2549 {
2550   /* never empty on EOS */
2551   if (queue->is_eos)
2552     return FALSE;
2553
2554   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2555     return queue->current->writing_pos <= queue->current->max_reading_pos;
2556   } else {
2557     if (queue->queue.length == 0)
2558       return TRUE;
2559   }
2560
2561   return FALSE;
2562 }
2563
2564 static gboolean
2565 gst_queue2_is_filled (GstQueue2 * queue)
2566 {
2567   gboolean res;
2568
2569   /* always filled on EOS */
2570   if (queue->is_eos)
2571     return TRUE;
2572
2573 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2574     (queue->cur_level.format) >= ((alt_max) ? \
2575       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2576
2577   /* if using a ring buffer we're filled if all ring buffer space is used
2578    * _by the current range_ */
2579   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2580     guint64 rb_size = queue->ring_buffer_max_size;
2581     GST_DEBUG_OBJECT (queue,
2582         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2583         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2584     return CHECK_FILLED (bytes, rb_size);
2585   }
2586
2587   /* if using file, we're never filled if we don't have EOS */
2588   if (QUEUE_IS_USING_TEMP_FILE (queue))
2589     return FALSE;
2590
2591   /* we are never filled when we have no buffers at all */
2592   if (queue->cur_level.buffers == 0)
2593     return FALSE;
2594
2595   /* we are filled if one of the current levels exceeds the max */
2596   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2597       || CHECK_FILLED (time, 0);
2598
2599   /* if we need to, use the rate estimate to check against the max time we are
2600    * allowed to queue */
2601   if (queue->use_rate_estimate)
2602     res |= CHECK_FILLED (rate_time, 0);
2603
2604 #undef CHECK_FILLED
2605   return res;
2606 }
2607
2608 static GstFlowReturn
2609 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2610     GstMiniObject * item, GstQueue2ItemType item_type)
2611 {
2612   /* we have to lock the queue since we span threads */
2613   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2614   /* when we received EOS, we refuse more data */
2615   if (queue->is_eos)
2616     goto out_eos;
2617   /* when we received unexpected from downstream, refuse more buffers */
2618   if (queue->unexpected)
2619     goto out_unexpected;
2620
2621   /* while we didn't receive the newsegment, we're seeking and we skip data */
2622   if (queue->seeking)
2623     goto out_seeking;
2624
2625   if (!gst_queue2_wait_free_space (queue))
2626     goto out_flushing;
2627
2628   /* put buffer in queue now */
2629   gst_queue2_locked_enqueue (queue, item, item_type);
2630   GST_QUEUE2_MUTEX_UNLOCK (queue);
2631   gst_queue2_post_buffering (queue);
2632
2633   return GST_FLOW_OK;
2634
2635   /* special conditions */
2636 out_flushing:
2637   {
2638     GstFlowReturn ret = queue->sinkresult;
2639
2640     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2641         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2642     GST_QUEUE2_MUTEX_UNLOCK (queue);
2643     gst_mini_object_unref (item);
2644
2645     return ret;
2646   }
2647 out_eos:
2648   {
2649     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2650     GST_QUEUE2_MUTEX_UNLOCK (queue);
2651     gst_mini_object_unref (item);
2652
2653     return GST_FLOW_EOS;
2654   }
2655 out_seeking:
2656   {
2657     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2658     GST_QUEUE2_MUTEX_UNLOCK (queue);
2659     gst_mini_object_unref (item);
2660
2661     return GST_FLOW_OK;
2662   }
2663 out_unexpected:
2664   {
2665     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2666     GST_QUEUE2_MUTEX_UNLOCK (queue);
2667     gst_mini_object_unref (item);
2668
2669     return GST_FLOW_EOS;
2670   }
2671 }
2672
2673 static GstFlowReturn
2674 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2675 {
2676   GstQueue2 *queue;
2677
2678   queue = GST_QUEUE2 (parent);
2679
2680   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2681       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2682       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2683       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2684       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2685
2686   return gst_queue2_chain_buffer_or_buffer_list (queue,
2687       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2688 }
2689
2690 static GstFlowReturn
2691 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2692     GstBufferList * buffer_list)
2693 {
2694   GstQueue2 *queue;
2695
2696   queue = GST_QUEUE2 (parent);
2697
2698   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2699       "received buffer list %p", buffer_list);
2700
2701   return gst_queue2_chain_buffer_or_buffer_list (queue,
2702       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2703 }
2704
2705 static GstMiniObject *
2706 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2707 {
2708   GstMiniObject *data;
2709
2710   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2711
2712   /* stop pushing buffers, we dequeue all items until we see an item that we
2713    * can push again, which is EOS or SEGMENT. If there is nothing in the
2714    * queue we can push, we set a flag to make the sinkpad refuse more
2715    * buffers with an EOS return value until we receive something
2716    * pushable again or we get flushed. */
2717   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2718     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2719       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2720           "dropping EOS buffer %p", data);
2721       gst_buffer_unref (GST_BUFFER_CAST (data));
2722     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2723       GstEvent *event = GST_EVENT_CAST (data);
2724       GstEventType type = GST_EVENT_TYPE (event);
2725
2726       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2727         /* we found a pushable item in the queue, push it out */
2728         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2729             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2730         return data;
2731       }
2732       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2733           "dropping EOS event %p", event);
2734       gst_event_unref (event);
2735     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2736       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2737           "dropping EOS buffer list %p", data);
2738       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2739     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2740       queue->last_query = FALSE;
2741       g_cond_signal (&queue->query_handled);
2742       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2743     }
2744   }
2745   /* no more items in the queue. Set the unexpected flag so that upstream
2746    * make us refuse any more buffers on the sinkpad. Since we will still
2747    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2748    * task function does not shut down. */
2749   queue->unexpected = TRUE;
2750   return NULL;
2751 }
2752
2753 /* dequeue an item from the queue an push it downstream. This functions returns
2754  * the result of the push. */
2755 static GstFlowReturn
2756 gst_queue2_push_one (GstQueue2 * queue)
2757 {
2758   GstFlowReturn result = queue->srcresult;
2759   GstMiniObject *data;
2760   GstQueue2ItemType item_type;
2761
2762   data = gst_queue2_locked_dequeue (queue, &item_type);
2763   if (data == NULL)
2764     goto no_item;
2765
2766 next:
2767   STATUS (queue, queue->srcpad, "We have something dequeud");
2768   g_atomic_int_set (&queue->downstream_may_block,
2769       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2770       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2771   GST_QUEUE2_MUTEX_UNLOCK (queue);
2772   gst_queue2_post_buffering (queue);
2773
2774   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2775     GstBuffer *buffer;
2776
2777     buffer = GST_BUFFER_CAST (data);
2778
2779     result = gst_pad_push (queue->srcpad, buffer);
2780     g_atomic_int_set (&queue->downstream_may_block, 0);
2781
2782     /* need to check for srcresult here as well */
2783     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2784     if (result == GST_FLOW_EOS) {
2785       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2786       if (data != NULL)
2787         goto next;
2788       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2789        * to the caller so that the task function does not shut down */
2790       result = GST_FLOW_OK;
2791     }
2792   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2793     GstEvent *event = GST_EVENT_CAST (data);
2794     GstEventType type = GST_EVENT_TYPE (event);
2795
2796     gst_pad_push_event (queue->srcpad, event);
2797
2798     /* if we're EOS, return EOS so that the task pauses. */
2799     if (type == GST_EVENT_EOS) {
2800       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2801           "pushed EOS event %p, return EOS", event);
2802       result = GST_FLOW_EOS;
2803     }
2804
2805     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2806   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2807     GstBufferList *buffer_list;
2808
2809     buffer_list = GST_BUFFER_LIST_CAST (data);
2810
2811     result = gst_pad_push_list (queue->srcpad, buffer_list);
2812     g_atomic_int_set (&queue->downstream_may_block, 0);
2813
2814     /* need to check for srcresult here as well */
2815     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2816     if (result == GST_FLOW_EOS) {
2817       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2818       if (data != NULL)
2819         goto next;
2820       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2821        * to the caller so that the task function does not shut down */
2822       result = GST_FLOW_OK;
2823     }
2824   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2825     GstQuery *query = GST_QUERY_CAST (data);
2826
2827     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2828     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2829     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2830     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2831         "did query %p, return %d", query, queue->last_query);
2832     g_cond_signal (&queue->query_handled);
2833     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2834     result = GST_FLOW_OK;
2835   }
2836   return result;
2837
2838   /* ERRORS */
2839 no_item:
2840   {
2841     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2842         "exit because we have no item in the queue");
2843     return GST_FLOW_ERROR;
2844   }
2845 out_flushing:
2846   {
2847     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2848     return GST_FLOW_FLUSHING;
2849   }
2850 }
2851
2852 /* called repeatedly with @pad as the source pad. This function should push out
2853  * data to the peer element. */
2854 static void
2855 gst_queue2_loop (GstPad * pad)
2856 {
2857   GstQueue2 *queue;
2858   GstFlowReturn ret;
2859
2860   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2861
2862   /* have to lock for thread-safety */
2863   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2864
2865   if (gst_queue2_is_empty (queue)) {
2866     gboolean started;
2867
2868     /* pause the timer while we wait. The fact that we are waiting does not mean
2869      * the byterate on the output pad is lower */
2870     if ((started = queue->out_timer_started))
2871       g_timer_stop (queue->out_timer);
2872
2873     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2874         "queue is empty, waiting for new data");
2875     do {
2876       /* Wait for data to be available, we could be unlocked because of a flush. */
2877       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2878     }
2879     while (gst_queue2_is_empty (queue));
2880
2881     /* and continue if we were running before */
2882     if (started)
2883       g_timer_continue (queue->out_timer);
2884   }
2885   ret = gst_queue2_push_one (queue);
2886   queue->srcresult = ret;
2887   queue->sinkresult = ret;
2888   if (ret != GST_FLOW_OK)
2889     goto out_flushing;
2890
2891   GST_QUEUE2_MUTEX_UNLOCK (queue);
2892   gst_queue2_post_buffering (queue);
2893
2894   return;
2895
2896   /* ERRORS */
2897 out_flushing:
2898   {
2899     gboolean eos = queue->is_eos;
2900     GstFlowReturn ret = queue->srcresult;
2901
2902     gst_pad_pause_task (queue->srcpad);
2903     if (ret == GST_FLOW_FLUSHING) {
2904       gst_queue2_locked_flush (queue, FALSE, FALSE);
2905     } else {
2906       GST_QUEUE2_SIGNAL_DEL (queue);
2907       queue->last_query = FALSE;
2908       g_cond_signal (&queue->query_handled);
2909     }
2910     GST_QUEUE2_MUTEX_UNLOCK (queue);
2911     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2912         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2913     /* let app know about us giving up if upstream is not expected to do so */
2914     /* EOS is already taken care of elsewhere */
2915     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
2916       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2917           (_("Internal data flow error.")),
2918           ("streaming task paused, reason %s (%d)",
2919               gst_flow_get_name (ret), ret));
2920       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2921     }
2922     return;
2923   }
2924 }
2925
2926 static gboolean
2927 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2928 {
2929   gboolean res = TRUE;
2930   GstQueue2 *queue = GST_QUEUE2 (parent);
2931
2932 #ifndef GST_DISABLE_GST_DEBUG
2933   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2934       event, GST_EVENT_TYPE_NAME (event));
2935 #endif
2936
2937   switch (GST_EVENT_TYPE (event)) {
2938     case GST_EVENT_FLUSH_START:
2939       if (QUEUE_IS_USING_QUEUE (queue)) {
2940         /* just forward upstream */
2941         res = gst_pad_push_event (queue->sinkpad, event);
2942       } else {
2943         /* now unblock the getrange function */
2944         GST_QUEUE2_MUTEX_LOCK (queue);
2945         GST_DEBUG_OBJECT (queue, "flushing");
2946         queue->srcresult = GST_FLOW_FLUSHING;
2947         GST_QUEUE2_SIGNAL_ADD (queue);
2948         GST_QUEUE2_MUTEX_UNLOCK (queue);
2949
2950         /* when using a temp file, we eat the event */
2951         res = TRUE;
2952         gst_event_unref (event);
2953       }
2954       break;
2955     case GST_EVENT_FLUSH_STOP:
2956       if (QUEUE_IS_USING_QUEUE (queue)) {
2957         /* just forward upstream */
2958         res = gst_pad_push_event (queue->sinkpad, event);
2959       } else {
2960         /* now unblock the getrange function */
2961         GST_QUEUE2_MUTEX_LOCK (queue);
2962         queue->srcresult = GST_FLOW_OK;
2963         GST_QUEUE2_MUTEX_UNLOCK (queue);
2964
2965         /* when using a temp file, we eat the event */
2966         res = TRUE;
2967         gst_event_unref (event);
2968       }
2969       break;
2970     case GST_EVENT_RECONFIGURE:
2971       GST_QUEUE2_MUTEX_LOCK (queue);
2972       /* assume downstream is linked now and try to push again */
2973       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
2974         queue->srcresult = GST_FLOW_OK;
2975         queue->sinkresult = GST_FLOW_OK;
2976         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
2977           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
2978               NULL);
2979         }
2980       }
2981       GST_QUEUE2_MUTEX_UNLOCK (queue);
2982
2983       res = gst_pad_push_event (queue->sinkpad, event);
2984       break;
2985     default:
2986       res = gst_pad_push_event (queue->sinkpad, event);
2987       break;
2988   }
2989
2990   return res;
2991 }
2992
2993 static gboolean
2994 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2995 {
2996   GstQueue2 *queue;
2997
2998   queue = GST_QUEUE2 (parent);
2999
3000   switch (GST_QUERY_TYPE (query)) {
3001     case GST_QUERY_POSITION:
3002     {
3003       gint64 peer_pos;
3004       GstFormat format;
3005
3006       if (!gst_pad_peer_query (queue->sinkpad, query))
3007         goto peer_failed;
3008
3009       /* get peer position */
3010       gst_query_parse_position (query, &format, &peer_pos);
3011
3012       /* FIXME: this code assumes that there's no discont in the queue */
3013       switch (format) {
3014         case GST_FORMAT_BYTES:
3015           peer_pos -= queue->cur_level.bytes;
3016           break;
3017         case GST_FORMAT_TIME:
3018           peer_pos -= queue->cur_level.time;
3019           break;
3020         default:
3021           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3022               "know how to adjust value", gst_format_get_name (format));
3023           return FALSE;
3024       }
3025       /* set updated position */
3026       gst_query_set_position (query, format, peer_pos);
3027       break;
3028     }
3029     case GST_QUERY_DURATION:
3030     {
3031       GST_DEBUG_OBJECT (queue, "doing peer query");
3032
3033       if (!gst_pad_peer_query (queue->sinkpad, query))
3034         goto peer_failed;
3035
3036       GST_DEBUG_OBJECT (queue, "peer query success");
3037       break;
3038     }
3039     case GST_QUERY_BUFFERING:
3040     {
3041       gint percent;
3042       gboolean is_buffering;
3043       GstBufferingMode mode;
3044       gint avg_in, avg_out;
3045       gint64 buffering_left;
3046
3047       GST_DEBUG_OBJECT (queue, "query buffering");
3048
3049       get_buffering_percent (queue, &is_buffering, &percent);
3050       gst_query_set_buffering_percent (query, is_buffering, percent);
3051
3052       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3053           &buffering_left);
3054       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3055           buffering_left);
3056
3057       if (!QUEUE_IS_USING_QUEUE (queue)) {
3058         /* add ranges for download and ringbuffer buffering */
3059         GstFormat format;
3060         gint64 start, stop, range_start, range_stop;
3061         guint64 writing_pos;
3062         gint64 estimated_total;
3063         gint64 duration;
3064         gboolean peer_res, is_eos;
3065         GstQueue2Range *queued_ranges;
3066
3067         /* we need a current download region */
3068         if (queue->current == NULL)
3069           return FALSE;
3070
3071         writing_pos = queue->current->writing_pos;
3072         is_eos = queue->is_eos;
3073
3074         if (is_eos) {
3075           /* we're EOS, we know the duration in bytes now */
3076           peer_res = TRUE;
3077           duration = writing_pos;
3078         } else {
3079           /* get duration of upstream in bytes */
3080           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3081               GST_FORMAT_BYTES, &duration);
3082         }
3083
3084         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3085             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3086
3087         /* calculate remaining and total download time */
3088         if (peer_res && avg_in > 0.0)
3089           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3090         else
3091           estimated_total = -1;
3092
3093         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3094             estimated_total);
3095
3096         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3097
3098         switch (format) {
3099           case GST_FORMAT_PERCENT:
3100             /* we need duration */
3101             if (!peer_res)
3102               goto peer_failed;
3103
3104             start = 0;
3105             /* get our available data relative to the duration */
3106             if (duration != -1)
3107               stop =
3108                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3109                   duration);
3110             else
3111               stop = -1;
3112             break;
3113           case GST_FORMAT_BYTES:
3114             start = 0;
3115             stop = writing_pos;
3116             break;
3117           default:
3118             start = -1;
3119             stop = -1;
3120             break;
3121         }
3122
3123         /* fill out the buffered ranges */
3124         for (queued_ranges = queue->ranges; queued_ranges;
3125             queued_ranges = queued_ranges->next) {
3126           switch (format) {
3127             case GST_FORMAT_PERCENT:
3128               if (duration == -1) {
3129                 range_start = 0;
3130                 range_stop = 0;
3131                 break;
3132               }
3133               range_start =
3134                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3135                   queued_ranges->offset, duration);
3136               range_stop =
3137                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3138                   queued_ranges->writing_pos, duration);
3139               break;
3140             case GST_FORMAT_BYTES:
3141               range_start = queued_ranges->offset;
3142               range_stop = queued_ranges->writing_pos;
3143               break;
3144             default:
3145               range_start = -1;
3146               range_stop = -1;
3147               break;
3148           }
3149           if (range_start == range_stop)
3150             continue;
3151           GST_DEBUG_OBJECT (queue,
3152               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3153               G_GINT64_FORMAT, range_start, range_stop);
3154           gst_query_add_buffering_range (query, range_start, range_stop);
3155         }
3156
3157         gst_query_set_buffering_range (query, format, start, stop,
3158             estimated_total);
3159       }
3160       break;
3161     }
3162     case GST_QUERY_SCHEDULING:
3163     {
3164       gboolean pull_mode;
3165       GstSchedulingFlags flags = 0;
3166
3167       if (!gst_pad_peer_query (queue->sinkpad, query))
3168         goto peer_failed;
3169
3170       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3171
3172       /* we can operate in pull mode when we are using a tempfile */
3173       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3174
3175       if (pull_mode)
3176         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3177       gst_query_set_scheduling (query, flags, 0, -1, 0);
3178       if (pull_mode)
3179         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3180       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3181       break;
3182     }
3183     default:
3184       /* peer handled other queries */
3185       if (!gst_pad_query_default (pad, parent, query))
3186         goto peer_failed;
3187       break;
3188   }
3189
3190   return TRUE;
3191
3192   /* ERRORS */
3193 peer_failed:
3194   {
3195     GST_DEBUG_OBJECT (queue, "failed peer query");
3196     return FALSE;
3197   }
3198 }
3199
3200 static gboolean
3201 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3202 {
3203   GstQueue2 *queue = GST_QUEUE2 (element);
3204
3205   /* simply forward to the srcpad query function */
3206   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3207       query);
3208 }
3209
3210 static void
3211 gst_queue2_update_upstream_size (GstQueue2 * queue)
3212 {
3213   gint64 upstream_size = -1;
3214
3215   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3216           &upstream_size)) {
3217     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3218
3219     /* upstream_size can be negative but queue->upstream_size is unsigned.
3220      * Prevent setting negative values to it (the query can return -1) */
3221     if (upstream_size >= 0)
3222       queue->upstream_size = upstream_size;
3223     else
3224       queue->upstream_size = 0;
3225   }
3226 }
3227
3228 static GstFlowReturn
3229 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3230     guint length, GstBuffer ** buffer)
3231 {
3232   GstQueue2 *queue;
3233   GstFlowReturn ret;
3234
3235   queue = GST_QUEUE2_CAST (parent);
3236
3237   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3238   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3239   offset = (offset == -1) ? queue->current->reading_pos : offset;
3240
3241   GST_DEBUG_OBJECT (queue,
3242       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3243
3244   /* catch any reads beyond the size of the file here to make sure queue2
3245    * doesn't send seek events beyond the size of the file upstream, since
3246    * that would confuse elements such as souphttpsrc and/or http servers.
3247    * Demuxers often just loop until EOS at the end of the file to figure out
3248    * when they've read all the end-headers or index chunks. */
3249   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3250     gst_queue2_update_upstream_size (queue);
3251     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3252       goto out_unexpected;
3253   }
3254
3255   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3256     gst_queue2_update_upstream_size (queue);
3257     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3258       length = queue->upstream_size - offset;
3259       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3260     }
3261   }
3262
3263   /* FIXME - function will block when the range is not yet available */
3264   ret = gst_queue2_create_read (queue, offset, length, buffer);
3265   GST_QUEUE2_MUTEX_UNLOCK (queue);
3266   gst_queue2_post_buffering (queue);
3267
3268   return ret;
3269
3270   /* ERRORS */
3271 out_flushing:
3272   {
3273     ret = queue->srcresult;
3274
3275     GST_DEBUG_OBJECT (queue, "we are flushing");
3276     GST_QUEUE2_MUTEX_UNLOCK (queue);
3277     return ret;
3278   }
3279 out_unexpected:
3280   {
3281     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3282     GST_QUEUE2_MUTEX_UNLOCK (queue);
3283     return GST_FLOW_EOS;
3284   }
3285 }
3286
3287 /* sink currently only operates in push mode */
3288 static gboolean
3289 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3290     GstPadMode mode, gboolean active)
3291 {
3292   gboolean result;
3293   GstQueue2 *queue;
3294
3295   queue = GST_QUEUE2 (parent);
3296
3297   switch (mode) {
3298     case GST_PAD_MODE_PUSH:
3299       if (active) {
3300         GST_QUEUE2_MUTEX_LOCK (queue);
3301         GST_DEBUG_OBJECT (queue, "activating push mode");
3302         queue->srcresult = GST_FLOW_OK;
3303         queue->sinkresult = GST_FLOW_OK;
3304         queue->is_eos = FALSE;
3305         queue->unexpected = FALSE;
3306         reset_rate_timer (queue);
3307         GST_QUEUE2_MUTEX_UNLOCK (queue);
3308       } else {
3309         /* unblock chain function */
3310         GST_QUEUE2_MUTEX_LOCK (queue);
3311         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3312         queue->srcresult = GST_FLOW_FLUSHING;
3313         queue->sinkresult = GST_FLOW_FLUSHING;
3314         GST_QUEUE2_SIGNAL_DEL (queue);
3315         /* Unblock query handler */
3316         queue->last_query = FALSE;
3317         g_cond_signal (&queue->query_handled);
3318         GST_QUEUE2_MUTEX_UNLOCK (queue);
3319
3320         /* wait until it is unblocked and clean up */
3321         GST_PAD_STREAM_LOCK (pad);
3322         GST_QUEUE2_MUTEX_LOCK (queue);
3323         gst_queue2_locked_flush (queue, TRUE, FALSE);
3324         GST_QUEUE2_MUTEX_UNLOCK (queue);
3325         GST_PAD_STREAM_UNLOCK (pad);
3326       }
3327       result = TRUE;
3328       break;
3329     default:
3330       result = FALSE;
3331       break;
3332   }
3333   return result;
3334 }
3335
3336 /* src operating in push mode, we start a task on the source pad that pushes out
3337  * buffers from the queue */
3338 static gboolean
3339 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3340 {
3341   gboolean result = FALSE;
3342   GstQueue2 *queue;
3343
3344   queue = GST_QUEUE2 (parent);
3345
3346   if (active) {
3347     GST_QUEUE2_MUTEX_LOCK (queue);
3348     GST_DEBUG_OBJECT (queue, "activating push mode");
3349     queue->srcresult = GST_FLOW_OK;
3350     queue->sinkresult = GST_FLOW_OK;
3351     queue->is_eos = FALSE;
3352     queue->unexpected = FALSE;
3353     result =
3354         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3355     GST_QUEUE2_MUTEX_UNLOCK (queue);
3356   } else {
3357     /* unblock loop function */
3358     GST_QUEUE2_MUTEX_LOCK (queue);
3359     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3360     queue->srcresult = GST_FLOW_FLUSHING;
3361     queue->sinkresult = GST_FLOW_FLUSHING;
3362     /* the item add signal will unblock */
3363     GST_QUEUE2_SIGNAL_ADD (queue);
3364     GST_QUEUE2_MUTEX_UNLOCK (queue);
3365
3366     /* step 2, make sure streaming finishes */
3367     result = gst_pad_stop_task (pad);
3368   }
3369
3370   return result;
3371 }
3372
3373 /* pull mode, downstream will call our getrange function */
3374 static gboolean
3375 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3376 {
3377   gboolean result;
3378   GstQueue2 *queue;
3379
3380   queue = GST_QUEUE2 (parent);
3381
3382   if (active) {
3383     GST_QUEUE2_MUTEX_LOCK (queue);
3384     if (!QUEUE_IS_USING_QUEUE (queue)) {
3385       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3386         /* open the temp file now */
3387         result = gst_queue2_open_temp_location_file (queue);
3388       } else if (!queue->ring_buffer) {
3389         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3390         result = ! !queue->ring_buffer;
3391       } else {
3392         result = TRUE;
3393       }
3394
3395       GST_DEBUG_OBJECT (queue, "activating pull mode");
3396       init_ranges (queue);
3397       queue->srcresult = GST_FLOW_OK;
3398       queue->sinkresult = GST_FLOW_OK;
3399       queue->is_eos = FALSE;
3400       queue->unexpected = FALSE;
3401       queue->upstream_size = 0;
3402     } else {
3403       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3404       /* this is not allowed, we cannot operate in pull mode without a temp
3405        * file. */
3406       queue->srcresult = GST_FLOW_FLUSHING;
3407       queue->sinkresult = GST_FLOW_FLUSHING;
3408       result = FALSE;
3409     }
3410     GST_QUEUE2_MUTEX_UNLOCK (queue);
3411   } else {
3412     GST_QUEUE2_MUTEX_LOCK (queue);
3413     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3414     queue->srcresult = GST_FLOW_FLUSHING;
3415     queue->sinkresult = GST_FLOW_FLUSHING;
3416     /* this will unlock getrange */
3417     GST_QUEUE2_SIGNAL_ADD (queue);
3418     result = TRUE;
3419     GST_QUEUE2_MUTEX_UNLOCK (queue);
3420   }
3421
3422   return result;
3423 }
3424
3425 static gboolean
3426 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3427     gboolean active)
3428 {
3429   gboolean res;
3430
3431   switch (mode) {
3432     case GST_PAD_MODE_PULL:
3433       res = gst_queue2_src_activate_pull (pad, parent, active);
3434       break;
3435     case GST_PAD_MODE_PUSH:
3436       res = gst_queue2_src_activate_push (pad, parent, active);
3437       break;
3438     default:
3439       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3440       res = FALSE;
3441       break;
3442   }
3443   return res;
3444 }
3445
3446 static GstStateChangeReturn
3447 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3448 {
3449   GstQueue2 *queue;
3450   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3451
3452   queue = GST_QUEUE2 (element);
3453
3454   switch (transition) {
3455     case GST_STATE_CHANGE_NULL_TO_READY:
3456       break;
3457     case GST_STATE_CHANGE_READY_TO_PAUSED:
3458       GST_QUEUE2_MUTEX_LOCK (queue);
3459       if (!QUEUE_IS_USING_QUEUE (queue)) {
3460         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3461           if (!gst_queue2_open_temp_location_file (queue))
3462             ret = GST_STATE_CHANGE_FAILURE;
3463         } else {
3464           if (queue->ring_buffer) {
3465             g_free (queue->ring_buffer);
3466             queue->ring_buffer = NULL;
3467           }
3468           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3469             ret = GST_STATE_CHANGE_FAILURE;
3470         }
3471         init_ranges (queue);
3472       }
3473       queue->segment_event_received = FALSE;
3474       queue->starting_segment = NULL;
3475       gst_event_replace (&queue->stream_start_event, NULL);
3476       GST_QUEUE2_MUTEX_UNLOCK (queue);
3477       break;
3478     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3479       break;
3480     default:
3481       break;
3482   }
3483
3484   if (ret == GST_STATE_CHANGE_FAILURE)
3485     return ret;
3486
3487   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3488
3489   if (ret == GST_STATE_CHANGE_FAILURE)
3490     return ret;
3491
3492   switch (transition) {
3493     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3494       break;
3495     case GST_STATE_CHANGE_PAUSED_TO_READY:
3496       GST_QUEUE2_MUTEX_LOCK (queue);
3497       if (!QUEUE_IS_USING_QUEUE (queue)) {
3498         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3499           gst_queue2_close_temp_location_file (queue);
3500         } else if (queue->ring_buffer) {
3501           g_free (queue->ring_buffer);
3502           queue->ring_buffer = NULL;
3503         }
3504         clean_ranges (queue);
3505       }
3506       if (queue->starting_segment != NULL) {
3507         gst_event_unref (queue->starting_segment);
3508         queue->starting_segment = NULL;
3509       }
3510       gst_event_replace (&queue->stream_start_event, NULL);
3511       GST_QUEUE2_MUTEX_UNLOCK (queue);
3512       break;
3513     case GST_STATE_CHANGE_READY_TO_NULL:
3514       break;
3515     default:
3516       break;
3517   }
3518
3519   return ret;
3520 }
3521
3522 /* changing the capacity of the queue must wake up
3523  * the _chain function, it might have more room now
3524  * to store the buffer/event in the queue */
3525 #define QUEUE_CAPACITY_CHANGE(q) \
3526   GST_QUEUE2_SIGNAL_DEL (queue); \
3527   if (queue->use_buffering)      \
3528     update_buffering (queue);
3529
3530 /* Changing the minimum required fill level must
3531  * wake up the _loop function as it might now
3532  * be able to preceed.
3533  */
3534 #define QUEUE_THRESHOLD_CHANGE(q)\
3535   GST_QUEUE2_SIGNAL_ADD (queue);
3536
3537 static void
3538 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3539 {
3540   GstState state;
3541
3542   /* the element must be stopped in order to do this */
3543   GST_OBJECT_LOCK (queue);
3544   state = GST_STATE (queue);
3545   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3546     goto wrong_state;
3547   GST_OBJECT_UNLOCK (queue);
3548
3549   /* set new location */
3550   g_free (queue->temp_template);
3551   queue->temp_template = g_strdup (template);
3552
3553   return;
3554
3555 /* ERROR */
3556 wrong_state:
3557   {
3558     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3559     GST_OBJECT_UNLOCK (queue);
3560   }
3561 }
3562
3563 static void
3564 gst_queue2_set_property (GObject * object,
3565     guint prop_id, const GValue * value, GParamSpec * pspec)
3566 {
3567   GstQueue2 *queue = GST_QUEUE2 (object);
3568
3569   /* someone could change levels here, and since this
3570    * affects the get/put funcs, we need to lock for safety. */
3571   GST_QUEUE2_MUTEX_LOCK (queue);
3572
3573   switch (prop_id) {
3574     case PROP_MAX_SIZE_BYTES:
3575       queue->max_level.bytes = g_value_get_uint (value);
3576       QUEUE_CAPACITY_CHANGE (queue);
3577       break;
3578     case PROP_MAX_SIZE_BUFFERS:
3579       queue->max_level.buffers = g_value_get_uint (value);
3580       QUEUE_CAPACITY_CHANGE (queue);
3581       break;
3582     case PROP_MAX_SIZE_TIME:
3583       queue->max_level.time = g_value_get_uint64 (value);
3584       /* set rate_time to the same value. We use an extra field in the level
3585        * structure so that we can easily access and compare it */
3586       queue->max_level.rate_time = queue->max_level.time;
3587       QUEUE_CAPACITY_CHANGE (queue);
3588       break;
3589     case PROP_USE_BUFFERING:
3590       queue->use_buffering = g_value_get_boolean (value);
3591       if (!queue->use_buffering && queue->is_buffering) {
3592         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3593             "posting 100%% message");
3594         SET_PERCENT (queue, 100);
3595         queue->is_buffering = FALSE;
3596       }
3597
3598       if (queue->use_buffering) {
3599         queue->is_buffering = TRUE;
3600         update_buffering (queue);
3601       }
3602       break;
3603     case PROP_USE_RATE_ESTIMATE:
3604       queue->use_rate_estimate = g_value_get_boolean (value);
3605       break;
3606     case PROP_LOW_PERCENT:
3607       queue->low_percent = g_value_get_int (value);
3608       break;
3609     case PROP_HIGH_PERCENT:
3610       queue->high_percent = g_value_get_int (value);
3611       break;
3612     case PROP_TEMP_TEMPLATE:
3613       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3614       break;
3615     case PROP_TEMP_REMOVE:
3616       queue->temp_remove = g_value_get_boolean (value);
3617       break;
3618     case PROP_RING_BUFFER_MAX_SIZE:
3619       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3620       break;
3621     default:
3622       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3623       break;
3624   }
3625
3626   GST_QUEUE2_MUTEX_UNLOCK (queue);
3627   gst_queue2_post_buffering (queue);
3628 }
3629
3630 static void
3631 gst_queue2_get_property (GObject * object,
3632     guint prop_id, GValue * value, GParamSpec * pspec)
3633 {
3634   GstQueue2 *queue = GST_QUEUE2 (object);
3635
3636   GST_QUEUE2_MUTEX_LOCK (queue);
3637
3638   switch (prop_id) {
3639     case PROP_CUR_LEVEL_BYTES:
3640       g_value_set_uint (value, queue->cur_level.bytes);
3641       break;
3642     case PROP_CUR_LEVEL_BUFFERS:
3643       g_value_set_uint (value, queue->cur_level.buffers);
3644       break;
3645     case PROP_CUR_LEVEL_TIME:
3646       g_value_set_uint64 (value, queue->cur_level.time);
3647       break;
3648     case PROP_MAX_SIZE_BYTES:
3649       g_value_set_uint (value, queue->max_level.bytes);
3650       break;
3651     case PROP_MAX_SIZE_BUFFERS:
3652       g_value_set_uint (value, queue->max_level.buffers);
3653       break;
3654     case PROP_MAX_SIZE_TIME:
3655       g_value_set_uint64 (value, queue->max_level.time);
3656       break;
3657     case PROP_USE_BUFFERING:
3658       g_value_set_boolean (value, queue->use_buffering);
3659       break;
3660     case PROP_USE_RATE_ESTIMATE:
3661       g_value_set_boolean (value, queue->use_rate_estimate);
3662       break;
3663     case PROP_LOW_PERCENT:
3664       g_value_set_int (value, queue->low_percent);
3665       break;
3666     case PROP_HIGH_PERCENT:
3667       g_value_set_int (value, queue->high_percent);
3668       break;
3669     case PROP_TEMP_TEMPLATE:
3670       g_value_set_string (value, queue->temp_template);
3671       break;
3672     case PROP_TEMP_LOCATION:
3673       g_value_set_string (value, queue->temp_location);
3674       break;
3675     case PROP_TEMP_REMOVE:
3676       g_value_set_boolean (value, queue->temp_remove);
3677       break;
3678     case PROP_RING_BUFFER_MAX_SIZE:
3679       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3680       break;
3681     case PROP_AVG_IN_RATE:
3682     {
3683       gdouble in_rate = queue->byte_in_rate;
3684
3685       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3686        * calculated, so calculate it here. */
3687       if (in_rate == 0.0 && queue->bytes_in
3688           && queue->last_update_in_rates_elapsed > 0.0)
3689         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3690
3691       g_value_set_int64 (value, (gint64) in_rate);
3692       break;
3693     }
3694     default:
3695       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3696       break;
3697   }
3698
3699   GST_QUEUE2_MUTEX_UNLOCK (queue);
3700 }