Merge remote-tracking branch 'origin/master' into 0.11
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * Since 0.10.24, setting the temp-location property with a filename is deprecated
50  * because it's impossible to securely open a temporary file in this way. The
51  * property will still be used to notify the application of the allocated
52  * filename, though.
53  *
54  * Last reviewed on 2009-07-10 (0.10.24)
55  */
56
57 #ifdef HAVE_CONFIG_H
58 #include "config.h"
59 #endif
60
61 #include "gstqueue2.h"
62
63 #include <glib/gstdio.h>
64
65 #include "gst/gst-i18n-lib.h"
66
67 #include <string.h>
68
69 #ifdef G_OS_WIN32
70 #include <io.h>                 /* lseek, open, close, read */
71 #undef lseek
72 #define lseek _lseeki64
73 #undef off_t
74 #define off_t guint64
75 #else
76 #include <unistd.h>
77 #endif
78
79 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
80     GST_PAD_SINK,
81     GST_PAD_ALWAYS,
82     GST_STATIC_CAPS_ANY);
83
84 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
85     GST_PAD_SRC,
86     GST_PAD_ALWAYS,
87     GST_STATIC_CAPS_ANY);
88
89 GST_DEBUG_CATEGORY_STATIC (queue_debug);
90 #define GST_CAT_DEFAULT (queue_debug)
91 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
92
93 enum
94 {
95   LAST_SIGNAL
96 };
97
98 /* other defines */
99 #define DEFAULT_BUFFER_SIZE 4096
100 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
101 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
102 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
103
104 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
105
106 /* default property values */
107 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
108 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
109 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
110 #define DEFAULT_USE_BUFFERING      FALSE
111 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
112 #define DEFAULT_LOW_PERCENT        10
113 #define DEFAULT_HIGH_PERCENT       99
114 #define DEFAULT_TEMP_REMOVE        TRUE
115 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
116
117 enum
118 {
119   PROP_0,
120   PROP_CUR_LEVEL_BUFFERS,
121   PROP_CUR_LEVEL_BYTES,
122   PROP_CUR_LEVEL_TIME,
123   PROP_MAX_SIZE_BUFFERS,
124   PROP_MAX_SIZE_BYTES,
125   PROP_MAX_SIZE_TIME,
126   PROP_USE_BUFFERING,
127   PROP_USE_RATE_ESTIMATE,
128   PROP_LOW_PERCENT,
129   PROP_HIGH_PERCENT,
130   PROP_TEMP_TEMPLATE,
131   PROP_TEMP_LOCATION,
132   PROP_TEMP_REMOVE,
133   PROP_RING_BUFFER_MAX_SIZE,
134   PROP_LAST
135 };
136
137 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
138   l.buffers = 0;                                        \
139   l.bytes = 0;                                          \
140   l.time = 0;                                           \
141   l.rate_time = 0;                                      \
142 } G_STMT_END
143
144 #define STATUS(queue, pad, msg) \
145   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
146                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
147                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
148                       " ns, %"G_GUINT64_FORMAT" items", \
149                       GST_DEBUG_PAD_NAME (pad), \
150                       queue->cur_level.buffers, \
151                       queue->max_level.buffers, \
152                       queue->cur_level.bytes, \
153                       queue->max_level.bytes, \
154                       queue->cur_level.time, \
155                       queue->max_level.time, \
156                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
157                         queue->current->writing_pos - queue->current->max_reading_pos : \
158                         queue->queue.length))
159
160 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
161   g_mutex_lock (q->qlock);                                              \
162 } G_STMT_END
163
164 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
165   GST_QUEUE2_MUTEX_LOCK (q);                                            \
166   if (res != GST_FLOW_OK)                                               \
167     goto label;                                                         \
168 } G_STMT_END
169
170 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
171   g_mutex_unlock (q->qlock);                                            \
172 } G_STMT_END
173
174 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
175   STATUS (queue, q->sinkpad, "wait for DEL");                           \
176   q->waiting_del = TRUE;                                                \
177   g_cond_wait (q->item_del, queue->qlock);                              \
178   q->waiting_del = FALSE;                                               \
179   if (res != GST_FLOW_OK) {                                             \
180     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
181     goto label;                                                         \
182   }                                                                     \
183   STATUS (queue, q->sinkpad, "received DEL");                           \
184 } G_STMT_END
185
186 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
187   STATUS (queue, q->srcpad, "wait for ADD");                            \
188   q->waiting_add = TRUE;                                                \
189   g_cond_wait (q->item_add, q->qlock);                                  \
190   q->waiting_add = FALSE;                                               \
191   if (res != GST_FLOW_OK) {                                             \
192     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
193     goto label;                                                         \
194   }                                                                     \
195   STATUS (queue, q->srcpad, "received ADD");                            \
196 } G_STMT_END
197
198 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
199   if (q->waiting_del) {                                                 \
200     STATUS (q, q->srcpad, "signal DEL");                                \
201     g_cond_signal (q->item_del);                                        \
202   }                                                                     \
203 } G_STMT_END
204
205 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
206   if (q->waiting_add) {                                                 \
207     STATUS (q, q->sinkpad, "signal ADD");                               \
208     g_cond_signal (q->item_add);                                        \
209   }                                                                     \
210 } G_STMT_END
211
212 #define _do_init \
213     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
214     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
215         "dataflow inside the queue element");
216 #define gst_queue2_parent_class parent_class
217 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
218
219 static void gst_queue2_finalize (GObject * object);
220
221 static void gst_queue2_set_property (GObject * object,
222     guint prop_id, const GValue * value, GParamSpec * pspec);
223 static void gst_queue2_get_property (GObject * object,
224     guint prop_id, GValue * value, GParamSpec * pspec);
225
226 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
227 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
228 static void gst_queue2_loop (GstPad * pad);
229
230 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
231 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstQuery * query);
232
233 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
234 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
235 static gboolean gst_queue2_handle_query (GstElement * element,
236     GstQuery * query);
237
238 static GstCaps *gst_queue2_getcaps (GstPad * pad, GstCaps * filter);
239
240 static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
241     guint length, GstBuffer ** buffer);
242
243 static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
244 static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
245 static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
246 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
247     GstStateChange transition);
248
249 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
250 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
251
252 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
253
254 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
255
256 static void
257 gst_queue2_class_init (GstQueue2Class * klass)
258 {
259   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
260   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
261
262   gobject_class->set_property = gst_queue2_set_property;
263   gobject_class->get_property = gst_queue2_get_property;
264
265   /* properties */
266   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
267       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
268           "Current amount of data in the queue (bytes)",
269           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
270   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
271       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
272           "Current number of buffers in the queue",
273           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
274   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
275       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
276           "Current amount of data in the queue (in ns)",
277           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
278
279   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
280       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
281           "Max. amount of data in the queue (bytes, 0=disable)",
282           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
283           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
284   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
285       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
286           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
287           DEFAULT_MAX_SIZE_BUFFERS,
288           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
290       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
291           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
292           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
293
294   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
295       g_param_spec_boolean ("use-buffering", "Use buffering",
296           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
297           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
299       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
300           "Estimate the bitrate of the stream to calculate time level",
301           DEFAULT_USE_RATE_ESTIMATE,
302           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
304       g_param_spec_int ("low-percent", "Low percent",
305           "Low threshold for buffering to start. Only used if use-buffering is True",
306           0, 100, DEFAULT_LOW_PERCENT,
307           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
309       g_param_spec_int ("high-percent", "High percent",
310           "High threshold for buffering to finish. Only used if use-buffering is True",
311           0, 100, DEFAULT_HIGH_PERCENT,
312           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
313
314   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
315       g_param_spec_string ("temp-template", "Temporary File Template",
316           "File template to store temporary files in, should contain directory "
317           "and XXXXXX. (NULL == disabled)",
318           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
319
320   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
321       g_param_spec_string ("temp-location", "Temporary File Location",
322           "Location to store temporary files in (Deprecated: Only read this "
323           "property, use temp-template to configure the name template)",
324           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
325
326   /**
327    * GstQueue2:temp-remove
328    *
329    * When temp-template is set, remove the temporary file when going to READY.
330    *
331    * Since: 0.10.26
332    */
333   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
334       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
335           "Remove the temp-location after use",
336           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
337
338   /**
339    * GstQueue2:ring-buffer-max-size
340    *
341    * The maximum size of the ring buffer in bytes. If set to 0, the ring
342    * buffer is disabled. Default 0.
343    *
344    * Since: 0.10.31
345    */
346   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
347       g_param_spec_uint64 ("ring-buffer-max-size",
348           "Max. ring buffer size (bytes)",
349           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
350           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
351           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352
353   /* set several parent class virtual functions */
354   gobject_class->finalize = gst_queue2_finalize;
355
356   gst_element_class_add_pad_template (gstelement_class,
357       gst_static_pad_template_get (&srctemplate));
358   gst_element_class_add_pad_template (gstelement_class,
359       gst_static_pad_template_get (&sinktemplate));
360
361   gst_element_class_set_details_simple (gstelement_class, "Queue 2",
362       "Generic",
363       "Simple data queue",
364       "Erik Walthinsen <omega@cse.ogi.edu>, "
365       "Wim Taymans <wim.taymans@gmail.com>");
366
367   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
368   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
369 }
370
371 static void
372 gst_queue2_init (GstQueue2 * queue)
373 {
374   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
375
376   gst_pad_set_chain_function (queue->sinkpad,
377       GST_DEBUG_FUNCPTR (gst_queue2_chain));
378   gst_pad_set_activatepush_function (queue->sinkpad,
379       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
380   gst_pad_set_event_function (queue->sinkpad,
381       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
382   gst_pad_set_query_function (queue->sinkpad,
383       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
384   gst_pad_set_getcaps_function (queue->sinkpad,
385       GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
386   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
387
388   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
389
390   gst_pad_set_activatepull_function (queue->srcpad,
391       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull));
392   gst_pad_set_activatepush_function (queue->srcpad,
393       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
394   gst_pad_set_getrange_function (queue->srcpad,
395       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
396   gst_pad_set_getcaps_function (queue->srcpad,
397       GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
398   gst_pad_set_event_function (queue->srcpad,
399       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
400   gst_pad_set_query_function (queue->srcpad,
401       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
402   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
403
404   /* levels */
405   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
406   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
407   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
408   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
409   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
410   queue->use_buffering = DEFAULT_USE_BUFFERING;
411   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
412   queue->low_percent = DEFAULT_LOW_PERCENT;
413   queue->high_percent = DEFAULT_HIGH_PERCENT;
414
415   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
416   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
417
418   queue->sinktime = GST_CLOCK_TIME_NONE;
419   queue->srctime = GST_CLOCK_TIME_NONE;
420   queue->sink_tainted = TRUE;
421   queue->src_tainted = TRUE;
422
423   queue->srcresult = GST_FLOW_WRONG_STATE;
424   queue->sinkresult = GST_FLOW_WRONG_STATE;
425   queue->is_eos = FALSE;
426   queue->in_timer = g_timer_new ();
427   queue->out_timer = g_timer_new ();
428
429   queue->qlock = g_mutex_new ();
430   queue->waiting_add = FALSE;
431   queue->item_add = g_cond_new ();
432   queue->waiting_del = FALSE;
433   queue->item_del = g_cond_new ();
434   g_queue_init (&queue->queue);
435
436   queue->buffering_percent = 100;
437
438   /* tempfile related */
439   queue->temp_template = NULL;
440   queue->temp_location = NULL;
441   queue->temp_location_set = FALSE;
442   queue->temp_remove = DEFAULT_TEMP_REMOVE;
443
444   queue->ring_buffer = NULL;
445   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
446
447   GST_DEBUG_OBJECT (queue,
448       "initialized queue's not_empty & not_full conditions");
449 }
450
451 /* called only once, as opposed to dispose */
452 static void
453 gst_queue2_finalize (GObject * object)
454 {
455   GstQueue2 *queue = GST_QUEUE2 (object);
456
457   GST_DEBUG_OBJECT (queue, "finalizing queue");
458
459   while (!g_queue_is_empty (&queue->queue)) {
460     GstMiniObject *data = g_queue_pop_head (&queue->queue);
461
462     gst_mini_object_unref (data);
463   }
464
465   g_queue_clear (&queue->queue);
466   g_mutex_free (queue->qlock);
467   g_cond_free (queue->item_add);
468   g_cond_free (queue->item_del);
469   g_timer_destroy (queue->in_timer);
470   g_timer_destroy (queue->out_timer);
471
472   /* temp_file path cleanup  */
473   g_free (queue->temp_template);
474   g_free (queue->temp_location);
475
476   G_OBJECT_CLASS (parent_class)->finalize (object);
477 }
478
479 static void
480 debug_ranges (GstQueue2 * queue)
481 {
482   GstQueue2Range *walk;
483
484   for (walk = queue->ranges; walk; walk = walk->next) {
485     GST_DEBUG_OBJECT (queue,
486         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
487         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
488         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
489         walk->rb_writing_pos, walk->reading_pos,
490         walk == queue->current ? "**y**" : "  n  ");
491   }
492 }
493
494 /* clear all the downloaded ranges */
495 static void
496 clean_ranges (GstQueue2 * queue)
497 {
498   GST_DEBUG_OBJECT (queue, "clean queue ranges");
499
500   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
501   queue->ranges = NULL;
502   queue->current = NULL;
503 }
504
505 /* find a range that contains @offset or NULL when nothing does */
506 static GstQueue2Range *
507 find_range (GstQueue2 * queue, guint64 offset)
508 {
509   GstQueue2Range *range = NULL;
510   GstQueue2Range *walk;
511
512   /* first do a quick check for the current range */
513   for (walk = queue->ranges; walk; walk = walk->next) {
514     if (offset >= walk->offset && offset <= walk->writing_pos) {
515       /* we can reuse an existing range */
516       range = walk;
517       break;
518     }
519   }
520   if (range) {
521     GST_DEBUG_OBJECT (queue,
522         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
523         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
524   } else {
525     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
526   }
527   return range;
528 }
529
530 static void
531 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
532 {
533   guint64 max_reading_pos, writing_pos;
534
535   writing_pos = range->writing_pos;
536   max_reading_pos = range->max_reading_pos;
537
538   if (writing_pos > max_reading_pos)
539     queue->cur_level.bytes = writing_pos - max_reading_pos;
540   else
541     queue->cur_level.bytes = 0;
542 }
543
544 /* make a new range for @offset or reuse an existing range */
545 static GstQueue2Range *
546 add_range (GstQueue2 * queue, guint64 offset)
547 {
548   GstQueue2Range *range, *prev, *next;
549
550   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
551
552   if ((range = find_range (queue, offset))) {
553     GST_DEBUG_OBJECT (queue,
554         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
555         range->writing_pos);
556     range->writing_pos = offset;
557   } else {
558     GST_DEBUG_OBJECT (queue,
559         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
560
561     range = g_slice_new0 (GstQueue2Range);
562     range->offset = offset;
563     /* we want to write to the next location in the ring buffer */
564     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
565     range->writing_pos = offset;
566     range->rb_writing_pos = range->rb_offset;
567     range->reading_pos = offset;
568     range->max_reading_pos = offset;
569
570     /* insert sorted */
571     prev = NULL;
572     next = queue->ranges;
573     while (next) {
574       if (next->offset > offset) {
575         /* insert before next */
576         GST_DEBUG_OBJECT (queue,
577             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
578             next->offset);
579         break;
580       }
581       /* try next */
582       prev = next;
583       next = next->next;
584     }
585     range->next = next;
586     if (prev)
587       prev->next = range;
588     else
589       queue->ranges = range;
590   }
591   debug_ranges (queue);
592
593   /* update the stats for this range */
594   update_cur_level (queue, range);
595
596   return range;
597 }
598
599
600 /* clear and init the download ranges for offset 0 */
601 static void
602 init_ranges (GstQueue2 * queue)
603 {
604   GST_DEBUG_OBJECT (queue, "init queue ranges");
605
606   /* get rid of all the current ranges */
607   clean_ranges (queue);
608   /* make a range for offset 0 */
609   queue->current = add_range (queue, 0);
610 }
611
612 static GstCaps *
613 gst_queue2_getcaps (GstPad * pad, GstCaps * filter)
614 {
615   GstQueue2 *queue;
616   GstPad *otherpad;
617   GstCaps *result;
618
619   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
620   if (G_UNLIKELY (queue == NULL))
621     return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
622
623   otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
624   result = gst_pad_peer_get_caps (otherpad, filter);
625   if (result == NULL)
626     result = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
627
628   gst_object_unref (queue);
629
630   return result;
631 }
632
633 /* calculate the diff between running time on the sink and src of the queue.
634  * This is the total amount of time in the queue. */
635 static void
636 update_time_level (GstQueue2 * queue)
637 {
638   if (queue->sink_tainted) {
639     queue->sinktime =
640         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
641         queue->sink_segment.position);
642     queue->sink_tainted = FALSE;
643   }
644
645   if (queue->src_tainted) {
646     queue->srctime =
647         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
648         queue->src_segment.position);
649     queue->src_tainted = FALSE;
650   }
651
652   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
653       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
654
655   if (queue->sinktime != GST_CLOCK_TIME_NONE
656       && queue->srctime != GST_CLOCK_TIME_NONE
657       && queue->sinktime >= queue->srctime)
658     queue->cur_level.time = queue->sinktime - queue->srctime;
659   else
660     queue->cur_level.time = 0;
661 }
662
663 /* take a SEGMENT event and apply the values to segment, updating the time
664  * level of queue. */
665 static void
666 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
667     gboolean is_sink)
668 {
669   gst_event_copy_segment (event, segment);
670
671   if (segment->format == GST_FORMAT_BYTES) {
672     if (QUEUE_IS_USING_TEMP_FILE (queue)) {
673       /* start is where we'll be getting from and as such writing next */
674       queue->current = add_range (queue, segment->start);
675       /* update the stats for this range */
676       update_cur_level (queue, queue->current);
677     }
678   }
679
680   /* now configure the values, we use these to track timestamps on the
681    * sinkpad. */
682   if (segment->format != GST_FORMAT_TIME) {
683     /* non-time format, pretent the current time segment is closed with a
684      * 0 start and unknown stop time. */
685     segment->format = GST_FORMAT_TIME;
686     segment->start = 0;
687     segment->stop = -1;
688     segment->time = 0;
689   }
690
691   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
692
693   if (is_sink)
694     queue->sink_tainted = TRUE;
695   else
696     queue->src_tainted = TRUE;
697
698   /* segment can update the time level of the queue */
699   update_time_level (queue);
700 }
701
702 /* take a buffer and update segment, updating the time level of the queue. */
703 static void
704 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
705     gboolean is_sink)
706 {
707   GstClockTime duration, timestamp;
708
709   timestamp = GST_BUFFER_TIMESTAMP (buffer);
710   duration = GST_BUFFER_DURATION (buffer);
711
712   /* if no timestamp is set, assume it's continuous with the previous
713    * time */
714   if (timestamp == GST_CLOCK_TIME_NONE)
715     timestamp = segment->position;
716
717   /* add duration */
718   if (duration != GST_CLOCK_TIME_NONE)
719     timestamp += duration;
720
721   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
722       GST_TIME_ARGS (timestamp));
723
724   segment->position = timestamp;
725
726   if (is_sink)
727     queue->sink_tainted = TRUE;
728   else
729     queue->src_tainted = TRUE;
730
731   /* calc diff with other end */
732   update_time_level (queue);
733 }
734
735 static void
736 update_buffering (GstQueue2 * queue)
737 {
738   gint64 percent;
739   gboolean post = FALSE;
740
741   if (queue->high_percent <= 0)
742     return;
743
744 #define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
745
746   if (queue->is_eos) {
747     /* on EOS we are always 100% full, we set the var here so that it we can
748      * reuse the logic below to stop buffering */
749     percent = 100;
750     GST_LOG_OBJECT (queue, "we are EOS");
751   } else {
752     /* figure out the percent we are filled, we take the max of all formats. */
753
754     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
755       percent = GET_PERCENT (bytes, 0);
756     } else {
757       guint64 rb_size = queue->ring_buffer_max_size;
758       percent = GET_PERCENT (bytes, rb_size);
759     }
760     percent = MAX (percent, GET_PERCENT (time, 0));
761     percent = MAX (percent, GET_PERCENT (buffers, 0));
762
763     /* also apply the rate estimate when we need to */
764     if (queue->use_rate_estimate)
765       percent = MAX (percent, GET_PERCENT (rate_time, 0));
766   }
767
768   if (queue->is_buffering) {
769     post = TRUE;
770     /* if we were buffering see if we reached the high watermark */
771     if (percent >= queue->high_percent)
772       queue->is_buffering = FALSE;
773   } else {
774     /* we were not buffering, check if we need to start buffering if we drop
775      * below the low threshold */
776     if (percent < queue->low_percent) {
777       queue->is_buffering = TRUE;
778       queue->buffering_iteration++;
779       post = TRUE;
780     }
781   }
782   if (post) {
783     GstMessage *message;
784     GstBufferingMode mode;
785     gint64 buffering_left = -1;
786
787     /* scale to high percent so that it becomes the 100% mark */
788     percent = percent * 100 / queue->high_percent;
789     /* clip */
790     if (percent > 100)
791       percent = 100;
792
793     if (percent != queue->buffering_percent) {
794       queue->buffering_percent = percent;
795
796       if (!QUEUE_IS_USING_QUEUE (queue)) {
797         gint64 duration;
798
799         if (QUEUE_IS_USING_RING_BUFFER (queue))
800           mode = GST_BUFFERING_TIMESHIFT;
801         else
802           mode = GST_BUFFERING_DOWNLOAD;
803
804         if (queue->byte_in_rate > 0) {
805           if (gst_pad_query_peer_duration (queue->sinkpad, GST_FORMAT_BYTES,
806                   &duration)) {
807             buffering_left =
808                 (gdouble) ((duration -
809                     queue->current->writing_pos) * 1000) / queue->byte_in_rate;
810           }
811         } else {
812           buffering_left = G_MAXINT64;
813         }
814       } else {
815         mode = GST_BUFFERING_STREAM;
816       }
817
818       GST_DEBUG_OBJECT (queue, "buffering %d percent", (gint) percent);
819       message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
820           (gint) percent);
821       gst_message_set_buffering_stats (message, mode,
822           queue->byte_in_rate, queue->byte_out_rate, buffering_left);
823
824       gst_element_post_message (GST_ELEMENT_CAST (queue), message);
825     }
826   } else {
827     GST_DEBUG_OBJECT (queue, "filled %d percent", (gint) percent);
828   }
829
830 #undef GET_PERCENT
831 }
832
833 static void
834 reset_rate_timer (GstQueue2 * queue)
835 {
836   queue->bytes_in = 0;
837   queue->bytes_out = 0;
838   queue->byte_in_rate = 0.0;
839   queue->byte_in_period = 0;
840   queue->byte_out_rate = 0.0;
841   queue->last_in_elapsed = 0.0;
842   queue->last_out_elapsed = 0.0;
843   queue->in_timer_started = FALSE;
844   queue->out_timer_started = FALSE;
845 }
846
847 /* the interval in seconds to recalculate the rate */
848 #define RATE_INTERVAL    0.2
849 /* Tuning for rate estimation. We use a large window for the input rate because
850  * it should be stable when connected to a network. The output rate is less
851  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
852  * therefore adapt more quickly.
853  * However, initial input rate may be subject to a burst, and should therefore
854  * initially also adapt more quickly to changes, and only later on give higher
855  * weight to previous values. */
856 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
857 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
858
859 static void
860 update_in_rates (GstQueue2 * queue)
861 {
862   gdouble elapsed, period;
863   gdouble byte_in_rate;
864
865   if (!queue->in_timer_started) {
866     queue->in_timer_started = TRUE;
867     g_timer_start (queue->in_timer);
868     return;
869   }
870
871   elapsed = g_timer_elapsed (queue->in_timer, NULL);
872
873   /* recalc after each interval. */
874   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
875     period = elapsed - queue->last_in_elapsed;
876
877     GST_DEBUG_OBJECT (queue,
878         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
879         period, queue->bytes_in, queue->byte_in_period);
880
881     byte_in_rate = queue->bytes_in / period;
882
883     if (queue->byte_in_rate == 0.0)
884       queue->byte_in_rate = byte_in_rate;
885     else
886       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
887           (double) queue->byte_in_period, period);
888
889     /* another data point, cap at 16 for long time running average */
890     if (queue->byte_in_period < 16 * RATE_INTERVAL)
891       queue->byte_in_period += period;
892
893     /* reset the values to calculate rate over the next interval */
894     queue->last_in_elapsed = elapsed;
895     queue->bytes_in = 0;
896   }
897
898   if (queue->byte_in_rate > 0.0) {
899     queue->cur_level.rate_time =
900         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
901   }
902   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
903       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
904 }
905
906 static void
907 update_out_rates (GstQueue2 * queue)
908 {
909   gdouble elapsed, period;
910   gdouble byte_out_rate;
911
912   if (!queue->out_timer_started) {
913     queue->out_timer_started = TRUE;
914     g_timer_start (queue->out_timer);
915     return;
916   }
917
918   elapsed = g_timer_elapsed (queue->out_timer, NULL);
919
920   /* recalc after each interval. */
921   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
922     period = elapsed - queue->last_out_elapsed;
923
924     GST_DEBUG_OBJECT (queue,
925         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
926
927     byte_out_rate = queue->bytes_out / period;
928
929     if (queue->byte_out_rate == 0.0)
930       queue->byte_out_rate = byte_out_rate;
931     else
932       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
933
934     /* reset the values to calculate rate over the next interval */
935     queue->last_out_elapsed = elapsed;
936     queue->bytes_out = 0;
937   }
938   if (queue->byte_in_rate > 0.0) {
939     queue->cur_level.rate_time =
940         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
941   }
942   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
943       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
944 }
945
946 static void
947 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
948 {
949   guint64 reading_pos, max_reading_pos;
950
951   reading_pos = pos;
952   max_reading_pos = range->max_reading_pos;
953
954   max_reading_pos = MAX (max_reading_pos, reading_pos);
955
956   GST_DEBUG_OBJECT (queue,
957       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
958       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
959   range->max_reading_pos = max_reading_pos;
960
961   update_cur_level (queue, range);
962 }
963
964 static gboolean
965 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
966 {
967   GstEvent *event;
968   gboolean res;
969
970   GST_QUEUE2_MUTEX_UNLOCK (queue);
971
972   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
973
974   event =
975       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
976       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
977       GST_SEEK_TYPE_NONE, -1);
978
979   res = gst_pad_push_event (queue->sinkpad, event);
980   GST_QUEUE2_MUTEX_LOCK (queue);
981
982   if (res)
983     queue->current = add_range (queue, offset);
984
985   return res;
986 }
987
988 /* see if there is enough data in the file to read a full buffer */
989 static gboolean
990 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
991 {
992   GstQueue2Range *range;
993
994   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
995       offset, length);
996
997   if ((range = find_range (queue, offset))) {
998     if (queue->current != range) {
999       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1000       perform_seek_to_offset (queue, range->writing_pos);
1001     }
1002
1003     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1004         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1005
1006     /* we have a range for offset */
1007     GST_DEBUG_OBJECT (queue,
1008         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1009         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1010
1011     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1012       return TRUE;
1013
1014     if (offset + length <= range->writing_pos)
1015       return TRUE;
1016     else
1017       GST_DEBUG_OBJECT (queue,
1018           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1019           (offset + length) - range->writing_pos);
1020
1021   } else {
1022     GST_INFO_OBJECT (queue, "not found in any range");
1023     /* we don't have the range, see how far away we are, FIXME, find a good
1024      * threshold based on the incoming rate. */
1025     if (!queue->is_eos && queue->current) {
1026       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1027         if (offset < queue->current->offset || offset >
1028             queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
1029             queue->cur_level.bytes) {
1030           perform_seek_to_offset (queue, offset);
1031         } else {
1032           GST_INFO_OBJECT (queue,
1033               "requested data is within range, wait for data");
1034         }
1035       } else if (offset < queue->current->writing_pos + 200000) {
1036         update_cur_pos (queue, queue->current, offset + length);
1037         GST_INFO_OBJECT (queue, "wait for data");
1038         return FALSE;
1039       }
1040     }
1041
1042     /* too far away, do a seek */
1043     perform_seek_to_offset (queue, offset);
1044   }
1045
1046   return FALSE;
1047 }
1048
1049 #ifdef HAVE_FSEEKO
1050 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1051 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1052 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1053 #else
1054 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1055 #endif
1056
1057 static GstFlowReturn
1058 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1059     guint8 * dst, gint64 * read_return)
1060 {
1061   guint8 *ring_buffer;
1062   size_t res;
1063
1064   ring_buffer = queue->ring_buffer;
1065
1066   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1067     goto seek_failed;
1068
1069   /* this should not block */
1070   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1071       length, offset);
1072   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1073     res = fread (dst, 1, length, queue->temp_file);
1074   } else {
1075     memcpy (dst, ring_buffer + offset, length);
1076     res = length;
1077   }
1078
1079   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1080
1081   if (G_UNLIKELY (res < length)) {
1082     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1083       goto could_not_read;
1084     /* check for errors or EOF */
1085     if (ferror (queue->temp_file))
1086       goto could_not_read;
1087     if (feof (queue->temp_file) && length > 0)
1088       goto eos;
1089   }
1090
1091   *read_return = res;
1092
1093   return GST_FLOW_OK;
1094
1095 seek_failed:
1096   {
1097     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1098     return GST_FLOW_ERROR;
1099   }
1100 could_not_read:
1101   {
1102     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1103     return GST_FLOW_ERROR;
1104   }
1105 eos:
1106   {
1107     GST_DEBUG ("non-regular file hits EOS");
1108     return GST_FLOW_EOS;
1109   }
1110 }
1111
1112 static GstFlowReturn
1113 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1114     GstBuffer ** buffer)
1115 {
1116   GstBuffer *buf;
1117   guint8 *data;
1118   guint64 file_offset;
1119   guint block_length, remaining, read_length;
1120   guint64 rb_size;
1121   guint64 rpos;
1122   GstFlowReturn ret = GST_FLOW_OK;
1123
1124   /* allocate the output buffer of the requested size */
1125   buf = gst_buffer_new_allocate (NULL, length, 0);
1126   data = gst_buffer_map (buf, NULL, NULL, GST_MAP_WRITE);
1127
1128   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1129       offset);
1130
1131   rpos = offset;
1132   rb_size = queue->ring_buffer_max_size;
1133
1134   remaining = length;
1135   while (remaining > 0) {
1136     /* configure how much/whether to read */
1137     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1138       read_length = 0;
1139
1140       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1141         guint64 level;
1142
1143         /* calculate how far away the offset is */
1144         if (queue->current->writing_pos > rpos)
1145           level = queue->current->writing_pos - rpos;
1146         else
1147           level = 0;
1148
1149         GST_DEBUG_OBJECT (queue,
1150             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1151             ", level %" G_GUINT64_FORMAT,
1152             rpos, queue->current->writing_pos, level);
1153
1154         if (level >= rb_size) {
1155           /* we don't have the data but if we have a ring buffer that is full, we
1156            * need to read */
1157           GST_DEBUG_OBJECT (queue,
1158               "ring buffer full, reading ring-buffer-max-size %"
1159               G_GUINT64_FORMAT " bytes", rb_size);
1160           read_length = rb_size;
1161         } else if (queue->is_eos) {
1162           /* won't get any more data so read any data we have */
1163           if (level) {
1164             GST_DEBUG_OBJECT (queue,
1165                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1166                 level);
1167             read_length = level;
1168           } else
1169             goto hit_eos;
1170         }
1171       }
1172
1173       if (read_length == 0) {
1174         if (QUEUE_IS_USING_RING_BUFFER (queue)
1175             && queue->current->max_reading_pos > rpos) {
1176           /* protect cached data (data between offset and max_reading_pos)
1177            * and update current level */
1178           GST_DEBUG_OBJECT (queue,
1179               "protecting cached data [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1180               "]", rpos, queue->current->max_reading_pos);
1181           queue->current->max_reading_pos = rpos;
1182           update_cur_level (queue, queue->current);
1183         }
1184         GST_DEBUG_OBJECT (queue, "waiting for add");
1185         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1186         continue;
1187       }
1188     } else {
1189       /* we have the requested data so read it */
1190       read_length = remaining;
1191     }
1192
1193     /* set range reading_pos to actual reading position for this read */
1194     queue->current->reading_pos = rpos;
1195
1196     /* configure how much and from where to read */
1197     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1198       file_offset =
1199           (queue->current->rb_offset + (rpos -
1200               queue->current->offset)) % rb_size;
1201       if (file_offset + read_length > rb_size) {
1202         block_length = rb_size - file_offset;
1203       } else {
1204         block_length = read_length;
1205       }
1206     } else {
1207       file_offset = rpos;
1208       block_length = read_length;
1209     }
1210
1211     /* while we still have data to read, we loop */
1212     while (read_length > 0) {
1213       gint64 read_return;
1214
1215       ret =
1216           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1217           data, &read_return);
1218       if (ret != GST_FLOW_OK)
1219         goto read_error;
1220
1221       file_offset += read_return;
1222       if (QUEUE_IS_USING_RING_BUFFER (queue))
1223         file_offset %= rb_size;
1224
1225       data += read_return;
1226       read_length -= read_return;
1227       block_length = read_length;
1228       remaining -= read_return;
1229
1230       rpos = (queue->current->reading_pos += read_return);
1231       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1232     }
1233     GST_QUEUE2_SIGNAL_DEL (queue);
1234     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1235   }
1236
1237   gst_buffer_unmap (buf, data, length);
1238
1239   GST_BUFFER_OFFSET (buf) = offset;
1240   GST_BUFFER_OFFSET_END (buf) = offset + length;
1241
1242   *buffer = buf;
1243
1244   return ret;
1245
1246   /* ERRORS */
1247 hit_eos:
1248   {
1249     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1250     gst_buffer_unref (buf);
1251     return GST_FLOW_EOS;
1252   }
1253 out_flushing:
1254   {
1255     GST_DEBUG_OBJECT (queue, "we are flushing");
1256     gst_buffer_unref (buf);
1257     return GST_FLOW_WRONG_STATE;
1258   }
1259 read_error:
1260   {
1261     GST_DEBUG_OBJECT (queue, "we have a read error");
1262     gst_buffer_unmap (buf, data, 0);
1263     gst_buffer_unref (buf);
1264     return ret;
1265   }
1266 }
1267
1268 /* should be called with QUEUE_LOCK */
1269 static GstMiniObject *
1270 gst_queue2_read_item_from_file (GstQueue2 * queue)
1271 {
1272   GstMiniObject *item;
1273
1274   if (queue->starting_segment != NULL) {
1275     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1276     queue->starting_segment = NULL;
1277   } else {
1278     GstFlowReturn ret;
1279     GstBuffer *buffer;
1280     guint64 reading_pos;
1281
1282     reading_pos = queue->current->reading_pos;
1283
1284     ret =
1285         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1286         &buffer);
1287
1288     switch (ret) {
1289       case GST_FLOW_OK:
1290         item = GST_MINI_OBJECT_CAST (buffer);
1291         break;
1292       case GST_FLOW_EOS:
1293         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1294         break;
1295       default:
1296         item = NULL;
1297         break;
1298     }
1299   }
1300   return item;
1301 }
1302
1303 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1304  * the temp filename. */
1305 static gboolean
1306 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1307 {
1308   gint fd = -1;
1309   gchar *name = NULL;
1310
1311   if (queue->temp_file)
1312     goto already_opened;
1313
1314   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1315
1316   /* we have two cases:
1317    * - temp_location was set to something !NULL (Deprecated). in this case we
1318    *   open the specified filename.
1319    * - temp_template was set, allocate a filename and open that filename
1320    */
1321   if (!queue->temp_location_set) {
1322     /* nothing to do */
1323     if (queue->temp_template == NULL)
1324       goto no_directory;
1325
1326     /* make copy of the template, we don't want to change this */
1327     name = g_strdup (queue->temp_template);
1328     fd = g_mkstemp (name);
1329     if (fd == -1)
1330       goto mkstemp_failed;
1331
1332     /* open the file for update/writing */
1333     queue->temp_file = fdopen (fd, "wb+");
1334     /* error creating file */
1335     if (queue->temp_file == NULL)
1336       goto open_failed;
1337
1338     g_free (queue->temp_location);
1339     queue->temp_location = name;
1340
1341     GST_QUEUE2_MUTEX_UNLOCK (queue);
1342
1343     /* we can't emit the notify with the lock */
1344     g_object_notify (G_OBJECT (queue), "temp-location");
1345
1346     GST_QUEUE2_MUTEX_LOCK (queue);
1347   } else {
1348     /* open the file for update/writing, this is deprecated but we still need to
1349      * support it for API/ABI compatibility */
1350     queue->temp_file = g_fopen (queue->temp_location, "wb+");
1351     /* error creating file */
1352     if (queue->temp_file == NULL)
1353       goto open_failed;
1354   }
1355   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1356
1357   return TRUE;
1358
1359   /* ERRORS */
1360 already_opened:
1361   {
1362     GST_DEBUG_OBJECT (queue, "temp file was already open");
1363     return TRUE;
1364   }
1365 no_directory:
1366   {
1367     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1368         (_("No Temp directory specified.")), (NULL));
1369     return FALSE;
1370   }
1371 mkstemp_failed:
1372   {
1373     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1374         (_("Could not create temp file \"%s\"."), queue->temp_template),
1375         GST_ERROR_SYSTEM);
1376     g_free (name);
1377     return FALSE;
1378   }
1379 open_failed:
1380   {
1381     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1382         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1383     g_free (name);
1384     if (fd != -1)
1385       close (fd);
1386     return FALSE;
1387   }
1388 }
1389
1390 static void
1391 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1392 {
1393   /* nothing to do */
1394   if (queue->temp_file == NULL)
1395     return;
1396
1397   GST_DEBUG_OBJECT (queue, "closing temp file");
1398
1399   fflush (queue->temp_file);
1400   fclose (queue->temp_file);
1401
1402   if (queue->temp_remove)
1403     remove (queue->temp_location);
1404
1405   queue->temp_file = NULL;
1406   clean_ranges (queue);
1407 }
1408
1409 static void
1410 gst_queue2_flush_temp_file (GstQueue2 * queue)
1411 {
1412   if (queue->temp_file == NULL)
1413     return;
1414
1415   GST_DEBUG_OBJECT (queue, "flushing temp file");
1416
1417   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1418 }
1419
1420 static void
1421 gst_queue2_locked_flush (GstQueue2 * queue)
1422 {
1423   if (!QUEUE_IS_USING_QUEUE (queue)) {
1424     if (QUEUE_IS_USING_TEMP_FILE (queue))
1425       gst_queue2_flush_temp_file (queue);
1426     init_ranges (queue);
1427   } else {
1428     while (!g_queue_is_empty (&queue->queue)) {
1429       GstMiniObject *data = g_queue_pop_head (&queue->queue);
1430
1431       /* Then lose another reference because we are supposed to destroy that
1432          data when flushing */
1433       gst_mini_object_unref (data);
1434     }
1435   }
1436   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1437   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1438   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1439   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1440   queue->sink_tainted = queue->src_tainted = TRUE;
1441   if (queue->starting_segment != NULL)
1442     gst_event_unref (queue->starting_segment);
1443   queue->starting_segment = NULL;
1444   queue->segment_event_received = FALSE;
1445
1446   /* we deleted a lot of something */
1447   GST_QUEUE2_SIGNAL_DEL (queue);
1448 }
1449
1450 static gboolean
1451 gst_queue2_wait_free_space (GstQueue2 * queue)
1452 {
1453   /* We make space available if we're "full" according to whatever
1454    * the user defined as "full". */
1455   if (gst_queue2_is_filled (queue)) {
1456     gboolean started;
1457
1458     /* pause the timer while we wait. The fact that we are waiting does not mean
1459      * the byterate on the input pad is lower */
1460     if ((started = queue->in_timer_started))
1461       g_timer_stop (queue->in_timer);
1462
1463     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1464         "queue is full, waiting for free space");
1465     do {
1466       /* Wait for space to be available, we could be unlocked because of a flush. */
1467       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1468     }
1469     while (gst_queue2_is_filled (queue));
1470
1471     /* and continue if we were running before */
1472     if (started)
1473       g_timer_continue (queue->in_timer);
1474   }
1475   return TRUE;
1476
1477   /* ERRORS */
1478 out_flushing:
1479   {
1480     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1481     return FALSE;
1482   }
1483 }
1484
1485 static gboolean
1486 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1487 {
1488   guint8 *odata, *data, *ring_buffer;
1489   guint size, rb_size;
1490   gsize osize;
1491   guint64 writing_pos, new_writing_pos;
1492   GstQueue2Range *range, *prev, *next;
1493
1494   if (QUEUE_IS_USING_RING_BUFFER (queue))
1495     writing_pos = queue->current->rb_writing_pos;
1496   else
1497     writing_pos = queue->current->writing_pos;
1498   ring_buffer = queue->ring_buffer;
1499   rb_size = queue->ring_buffer_max_size;
1500
1501   odata = gst_buffer_map (buffer, &osize, NULL, GST_MAP_READ);
1502
1503   size = osize;
1504   data = odata;
1505
1506   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1507       GST_BUFFER_OFFSET (buffer));
1508
1509   while (size > 0) {
1510     guint to_write;
1511
1512     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1513       gint64 space;
1514
1515       /* calculate the space in the ring buffer not used by data from
1516        * the current range */
1517       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1518         /* wait until there is some free space */
1519         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1520       }
1521       /* get the amount of space we have */
1522       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1523
1524       /* calculate if we need to split or if we can write the entire
1525        * buffer now */
1526       to_write = MIN (size, space);
1527
1528       /* the writing position in the ring buffer after writing (part
1529        * or all of) the buffer */
1530       new_writing_pos = (writing_pos + to_write) % rb_size;
1531
1532       prev = NULL;
1533       range = queue->ranges;
1534
1535       /* if we need to overwrite data in the ring buffer, we need to
1536        * update the ranges
1537        *
1538        * warning: this code is complicated and includes some
1539        * simplifications - pen, paper and diagrams for the cases
1540        * recommended! */
1541       while (range) {
1542         guint64 range_data_start, range_data_end;
1543         GstQueue2Range *range_to_destroy = NULL;
1544
1545         range_data_start = range->rb_offset;
1546         range_data_end = range->rb_writing_pos;
1547
1548         /* handle the special case where the range has no data in it */
1549         if (range->writing_pos == range->offset) {
1550           if (range != queue->current) {
1551             GST_DEBUG_OBJECT (queue,
1552                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1553                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1554             /* remove range */
1555             range_to_destroy = range;
1556             if (prev)
1557               prev->next = range->next;
1558           }
1559           goto next_range;
1560         }
1561
1562         if (range_data_end > range_data_start) {
1563           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1564             goto next_range;
1565
1566           if (new_writing_pos > range_data_start) {
1567             if (new_writing_pos >= range_data_end) {
1568               GST_DEBUG_OBJECT (queue,
1569                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1570                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1571               /* remove range */
1572               range_to_destroy = range;
1573               if (prev)
1574                 prev->next = range->next;
1575             } else {
1576               GST_DEBUG_OBJECT (queue,
1577                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1578                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1579                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1580                   range->offset + new_writing_pos - range_data_start,
1581                   new_writing_pos);
1582               range->offset += (new_writing_pos - range_data_start);
1583               range->rb_offset = new_writing_pos;
1584             }
1585           }
1586         } else {
1587           guint64 new_wpos_virt = writing_pos + to_write;
1588
1589           if (new_wpos_virt <= range_data_start)
1590             goto next_range;
1591
1592           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1593             GST_DEBUG_OBJECT (queue,
1594                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1595                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1596             /* remove range */
1597             range_to_destroy = range;
1598             if (prev)
1599               prev->next = range->next;
1600           } else {
1601             GST_DEBUG_OBJECT (queue,
1602                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1603                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1604                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1605                 range->offset + new_writing_pos - range_data_start,
1606                 new_writing_pos);
1607             range->offset += (new_wpos_virt - range_data_start);
1608             range->rb_offset = new_writing_pos;
1609           }
1610         }
1611
1612       next_range:
1613         if (!range_to_destroy)
1614           prev = range;
1615
1616         range = range->next;
1617         if (range_to_destroy) {
1618           if (range_to_destroy == queue->ranges)
1619             queue->ranges = range;
1620           g_slice_free (GstQueue2Range, range_to_destroy);
1621           range_to_destroy = NULL;
1622         }
1623       }
1624     } else {
1625       to_write = size;
1626       new_writing_pos = writing_pos + to_write;
1627     }
1628
1629     if (QUEUE_IS_USING_TEMP_FILE (queue)
1630         && FSEEK_FILE (queue->temp_file, writing_pos))
1631       goto seek_failed;
1632
1633     if (new_writing_pos > writing_pos) {
1634       GST_INFO_OBJECT (queue,
1635           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1636           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1637           queue->current->writing_pos, queue->current->rb_writing_pos);
1638       /* either not using ring buffer or no wrapping, just write */
1639       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1640         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1641           goto handle_error;
1642       } else {
1643         memcpy (ring_buffer + writing_pos, data, to_write);
1644       }
1645
1646       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1647         /* try to merge with next range */
1648         while ((next = queue->current->next)) {
1649           GST_INFO_OBJECT (queue,
1650               "checking merge with next range %" G_GUINT64_FORMAT " < %"
1651               G_GUINT64_FORMAT, new_writing_pos, next->offset);
1652           if (new_writing_pos < next->offset)
1653             break;
1654
1655           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1656               next->writing_pos);
1657
1658           /* remove the group, we could choose to not read the data in this range
1659            * again. This would involve us doing a seek to the current writing position
1660            * in the range. FIXME, It would probably make sense to do a seek when there
1661            * is a lot of data in the range we merged with to avoid reading it all
1662            * again. */
1663           queue->current->next = next->next;
1664           g_slice_free (GstQueue2Range, next);
1665
1666           debug_ranges (queue);
1667         }
1668         goto update_and_signal;
1669       }
1670     } else {
1671       /* wrapping */
1672       guint block_one, block_two;
1673
1674       block_one = rb_size - writing_pos;
1675       block_two = to_write - block_one;
1676
1677       if (block_one > 0) {
1678         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
1679         /* write data to end of ring buffer */
1680         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1681           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1682             goto handle_error;
1683         } else {
1684           memcpy (ring_buffer + writing_pos, data, block_one);
1685         }
1686       }
1687
1688       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
1689         goto seek_failed;
1690
1691       if (block_two > 0) {
1692         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
1693         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1694           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
1695             goto handle_error;
1696         } else {
1697           memcpy (ring_buffer, data + block_one, block_two);
1698         }
1699       }
1700     }
1701
1702   update_and_signal:
1703     /* update the writing positions */
1704     size -= to_write;
1705     GST_INFO_OBJECT (queue,
1706         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
1707         to_write, writing_pos, size);
1708
1709     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1710       data += to_write;
1711       queue->current->writing_pos += to_write;
1712       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1713     } else {
1714       queue->current->writing_pos = writing_pos = new_writing_pos;
1715     }
1716     update_cur_level (queue, queue->current);
1717
1718     /* update the buffering status */
1719     if (queue->use_buffering)
1720       update_buffering (queue);
1721
1722     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1723         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1724
1725     GST_QUEUE2_SIGNAL_ADD (queue);
1726   }
1727
1728   gst_buffer_unmap (buffer, odata, osize);
1729
1730   return TRUE;
1731
1732   /* ERRORS */
1733 out_flushing:
1734   {
1735     GST_DEBUG_OBJECT (queue, "we are flushing");
1736     gst_buffer_unmap (buffer, odata, osize);
1737     /* FIXME - GST_FLOW_EOS ? */
1738     return FALSE;
1739   }
1740 seek_failed:
1741   {
1742     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1743     gst_buffer_unmap (buffer, odata, osize);
1744     return FALSE;
1745   }
1746 handle_error:
1747   {
1748     switch (errno) {
1749       case ENOSPC:{
1750         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
1751         break;
1752       }
1753       default:{
1754         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
1755             (_("Error while writing to download file.")),
1756             ("%s", g_strerror (errno)));
1757       }
1758     }
1759     gst_buffer_unmap (buffer, odata, osize);
1760     return FALSE;
1761   }
1762 }
1763
1764 /* enqueue an item an update the level stats */
1765 static void
1766 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
1767 {
1768   if (isbuffer) {
1769     GstBuffer *buffer;
1770     guint size;
1771
1772     buffer = GST_BUFFER_CAST (item);
1773     size = gst_buffer_get_size (buffer);
1774
1775     /* add buffer to the statistics */
1776     if (QUEUE_IS_USING_QUEUE (queue)) {
1777       queue->cur_level.buffers++;
1778       queue->cur_level.bytes += size;
1779     }
1780     queue->bytes_in += size;
1781
1782     /* apply new buffer to segment stats */
1783     apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
1784     /* update the byterate stats */
1785     update_in_rates (queue);
1786
1787     if (!QUEUE_IS_USING_QUEUE (queue)) {
1788       /* FIXME - check return value? */
1789       gst_queue2_create_write (queue, buffer);
1790     }
1791   } else if (GST_IS_EVENT (item)) {
1792     GstEvent *event;
1793
1794     event = GST_EVENT_CAST (item);
1795
1796     switch (GST_EVENT_TYPE (event)) {
1797       case GST_EVENT_EOS:
1798         /* Zero the thresholds, this makes sure the queue is completely
1799          * filled and we can read all data from the queue. */
1800         GST_DEBUG_OBJECT (queue, "we have EOS");
1801         queue->is_eos = TRUE;
1802         break;
1803       case GST_EVENT_SEGMENT:
1804         apply_segment (queue, event, &queue->sink_segment, TRUE);
1805         /* This is our first new segment, we hold it
1806          * as we can't save it on the temp file */
1807         if (!QUEUE_IS_USING_QUEUE (queue)) {
1808           if (queue->segment_event_received)
1809             goto unexpected_event;
1810
1811           queue->segment_event_received = TRUE;
1812           if (queue->starting_segment != NULL)
1813             gst_event_unref (queue->starting_segment);
1814           queue->starting_segment = event;
1815           item = NULL;
1816         }
1817         /* a new segment allows us to accept more buffers if we got EOS
1818          * from downstream */
1819         queue->unexpected = FALSE;
1820         break;
1821       default:
1822         if (!QUEUE_IS_USING_QUEUE (queue))
1823           goto unexpected_event;
1824         break;
1825     }
1826   } else {
1827     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
1828         item, GST_OBJECT_NAME (queue));
1829     /* we can't really unref since we don't know what it is */
1830     item = NULL;
1831   }
1832
1833   if (item) {
1834     /* update the buffering status */
1835     if (queue->use_buffering)
1836       update_buffering (queue);
1837
1838     if (QUEUE_IS_USING_QUEUE (queue)) {
1839       g_queue_push_tail (&queue->queue, item);
1840     } else {
1841       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
1842     }
1843
1844     GST_QUEUE2_SIGNAL_ADD (queue);
1845   }
1846
1847   return;
1848
1849   /* ERRORS */
1850 unexpected_event:
1851   {
1852     g_warning
1853         ("Unexpected event of kind %s can't be added in temp file of queue %s ",
1854         gst_event_type_get_name (GST_EVENT_TYPE (item)),
1855         GST_OBJECT_NAME (queue));
1856     gst_event_unref (GST_EVENT_CAST (item));
1857     return;
1858   }
1859 }
1860
1861 /* dequeue an item from the queue and update level stats */
1862 static GstMiniObject *
1863 gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
1864 {
1865   GstMiniObject *item;
1866
1867   if (!QUEUE_IS_USING_QUEUE (queue))
1868     item = gst_queue2_read_item_from_file (queue);
1869   else
1870     item = g_queue_pop_head (&queue->queue);
1871
1872   if (item == NULL)
1873     goto no_item;
1874
1875   if (GST_IS_BUFFER (item)) {
1876     GstBuffer *buffer;
1877     guint size;
1878
1879     buffer = GST_BUFFER_CAST (item);
1880     size = gst_buffer_get_size (buffer);
1881     *is_buffer = TRUE;
1882
1883     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1884         "retrieved buffer %p from queue", buffer);
1885
1886     if (QUEUE_IS_USING_QUEUE (queue)) {
1887       queue->cur_level.buffers--;
1888       queue->cur_level.bytes -= size;
1889     }
1890     queue->bytes_out += size;
1891
1892     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
1893     /* update the byterate stats */
1894     update_out_rates (queue);
1895     /* update the buffering */
1896     if (queue->use_buffering)
1897       update_buffering (queue);
1898
1899   } else if (GST_IS_EVENT (item)) {
1900     GstEvent *event = GST_EVENT_CAST (item);
1901
1902     *is_buffer = FALSE;
1903
1904     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1905         "retrieved event %p from queue", event);
1906
1907     switch (GST_EVENT_TYPE (event)) {
1908       case GST_EVENT_EOS:
1909         /* queue is empty now that we dequeued the EOS */
1910         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1911         break;
1912       case GST_EVENT_SEGMENT:
1913         apply_segment (queue, event, &queue->src_segment, FALSE);
1914         break;
1915       default:
1916         break;
1917     }
1918   } else {
1919     g_warning
1920         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
1921         item, GST_OBJECT_NAME (queue));
1922     item = NULL;
1923   }
1924   GST_QUEUE2_SIGNAL_DEL (queue);
1925
1926   return item;
1927
1928   /* ERRORS */
1929 no_item:
1930   {
1931     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
1932     return NULL;
1933   }
1934 }
1935
1936 static gboolean
1937 gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
1938 {
1939   GstQueue2 *queue;
1940
1941   queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
1942
1943   switch (GST_EVENT_TYPE (event)) {
1944     case GST_EVENT_FLUSH_START:
1945     {
1946       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
1947       if (QUEUE_IS_USING_QUEUE (queue)) {
1948         /* forward event */
1949         gst_pad_push_event (queue->srcpad, event);
1950
1951         /* now unblock the chain function */
1952         GST_QUEUE2_MUTEX_LOCK (queue);
1953         queue->srcresult = GST_FLOW_WRONG_STATE;
1954         queue->sinkresult = GST_FLOW_WRONG_STATE;
1955         /* unblock the loop and chain functions */
1956         GST_QUEUE2_SIGNAL_ADD (queue);
1957         GST_QUEUE2_SIGNAL_DEL (queue);
1958         GST_QUEUE2_MUTEX_UNLOCK (queue);
1959
1960         /* make sure it pauses, this should happen since we sent
1961          * flush_start downstream. */
1962         gst_pad_pause_task (queue->srcpad);
1963         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
1964       } else {
1965         GST_QUEUE2_MUTEX_LOCK (queue);
1966         /* flush the sink pad */
1967         queue->sinkresult = GST_FLOW_WRONG_STATE;
1968         GST_QUEUE2_SIGNAL_DEL (queue);
1969         GST_QUEUE2_MUTEX_UNLOCK (queue);
1970
1971         gst_event_unref (event);
1972       }
1973       goto done;
1974     }
1975     case GST_EVENT_FLUSH_STOP:
1976     {
1977       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
1978
1979       if (QUEUE_IS_USING_QUEUE (queue)) {
1980         /* forward event */
1981         gst_pad_push_event (queue->srcpad, event);
1982
1983         GST_QUEUE2_MUTEX_LOCK (queue);
1984         gst_queue2_locked_flush (queue);
1985         queue->srcresult = GST_FLOW_OK;
1986         queue->sinkresult = GST_FLOW_OK;
1987         queue->is_eos = FALSE;
1988         queue->unexpected = FALSE;
1989         /* reset rate counters */
1990         reset_rate_timer (queue);
1991         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
1992             queue->srcpad);
1993         GST_QUEUE2_MUTEX_UNLOCK (queue);
1994       } else {
1995         GST_QUEUE2_MUTEX_LOCK (queue);
1996         queue->segment_event_received = FALSE;
1997         queue->is_eos = FALSE;
1998         queue->unexpected = FALSE;
1999         queue->sinkresult = GST_FLOW_OK;
2000         GST_QUEUE2_MUTEX_UNLOCK (queue);
2001
2002         gst_event_unref (event);
2003       }
2004       goto done;
2005     }
2006     default:
2007       if (GST_EVENT_IS_SERIALIZED (event)) {
2008         /* serialized events go in the queue */
2009         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2010         /* refuse more events on EOS */
2011         if (queue->is_eos)
2012           goto out_eos;
2013         gst_queue2_locked_enqueue (queue, event, FALSE);
2014         GST_QUEUE2_MUTEX_UNLOCK (queue);
2015       } else {
2016         /* non-serialized events are passed upstream. */
2017         gst_pad_push_event (queue->srcpad, event);
2018       }
2019       break;
2020   }
2021 done:
2022   return TRUE;
2023
2024   /* ERRORS */
2025 out_flushing:
2026   {
2027     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2028     GST_QUEUE2_MUTEX_UNLOCK (queue);
2029     gst_event_unref (event);
2030     return FALSE;
2031   }
2032 out_eos:
2033   {
2034     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2035     GST_QUEUE2_MUTEX_UNLOCK (queue);
2036     gst_event_unref (event);
2037     return FALSE;
2038   }
2039 }
2040
2041 static gboolean
2042 gst_queue2_handle_sink_query (GstPad * pad, GstQuery * query)
2043 {
2044   gboolean res;
2045   GstQueue2 *queue;
2046
2047   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2048   if (G_UNLIKELY (queue == NULL))
2049     return FALSE;
2050
2051   switch (GST_QUERY_TYPE (query)) {
2052     default:
2053       res = gst_pad_peer_query (queue->srcpad, query);
2054       break;
2055   }
2056   gst_object_unref (queue);
2057
2058   return res;
2059 }
2060
2061 static gboolean
2062 gst_queue2_is_empty (GstQueue2 * queue)
2063 {
2064   /* never empty on EOS */
2065   if (queue->is_eos)
2066     return FALSE;
2067
2068   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2069     return queue->current->writing_pos <= queue->current->max_reading_pos;
2070   } else {
2071     if (queue->queue.length == 0)
2072       return TRUE;
2073   }
2074
2075   return FALSE;
2076 }
2077
2078 static gboolean
2079 gst_queue2_is_filled (GstQueue2 * queue)
2080 {
2081   gboolean res;
2082
2083   /* always filled on EOS */
2084   if (queue->is_eos)
2085     return TRUE;
2086
2087 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2088     (queue->cur_level.format) >= ((alt_max) ? \
2089       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2090
2091   /* if using a ring buffer we're filled if all ring buffer space is used
2092    * _by the current range_ */
2093   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2094     guint64 rb_size = queue->ring_buffer_max_size;
2095     GST_DEBUG_OBJECT (queue,
2096         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2097         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2098     return CHECK_FILLED (bytes, rb_size);
2099   }
2100
2101   /* if using file, we're never filled if we don't have EOS */
2102   if (QUEUE_IS_USING_TEMP_FILE (queue))
2103     return FALSE;
2104
2105   /* we are never filled when we have no buffers at all */
2106   if (queue->cur_level.buffers == 0)
2107     return FALSE;
2108
2109   /* we are filled if one of the current levels exceeds the max */
2110   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2111       || CHECK_FILLED (time, 0);
2112
2113   /* if we need to, use the rate estimate to check against the max time we are
2114    * allowed to queue */
2115   if (queue->use_rate_estimate)
2116     res |= CHECK_FILLED (rate_time, 0);
2117
2118 #undef CHECK_FILLED
2119   return res;
2120 }
2121
2122 static GstFlowReturn
2123 gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
2124 {
2125   GstQueue2 *queue;
2126
2127   queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
2128
2129   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
2130       G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2131       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2132       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2133       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2134
2135   /* we have to lock the queue since we span threads */
2136   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2137   /* when we received EOS, we refuse more data */
2138   if (queue->is_eos)
2139     goto out_eos;
2140   /* when we received unexpected from downstream, refuse more buffers */
2141   if (queue->unexpected)
2142     goto out_unexpected;
2143
2144   if (!gst_queue2_wait_free_space (queue))
2145     goto out_flushing;
2146
2147   /* put buffer in queue now */
2148   gst_queue2_locked_enqueue (queue, buffer, TRUE);
2149   GST_QUEUE2_MUTEX_UNLOCK (queue);
2150
2151   return GST_FLOW_OK;
2152
2153   /* special conditions */
2154 out_flushing:
2155   {
2156     GstFlowReturn ret = queue->sinkresult;
2157
2158     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2159         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2160     GST_QUEUE2_MUTEX_UNLOCK (queue);
2161     gst_buffer_unref (buffer);
2162
2163     return ret;
2164   }
2165 out_eos:
2166   {
2167     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2168     GST_QUEUE2_MUTEX_UNLOCK (queue);
2169     gst_buffer_unref (buffer);
2170
2171     return GST_FLOW_EOS;
2172   }
2173 out_unexpected:
2174   {
2175     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2176     GST_QUEUE2_MUTEX_UNLOCK (queue);
2177     gst_buffer_unref (buffer);
2178
2179     return GST_FLOW_EOS;
2180   }
2181 }
2182
2183 /* dequeue an item from the queue an push it downstream. This functions returns
2184  * the result of the push. */
2185 static GstFlowReturn
2186 gst_queue2_push_one (GstQueue2 * queue)
2187 {
2188   GstFlowReturn result = GST_FLOW_OK;
2189   GstMiniObject *data;
2190   gboolean is_buffer = FALSE;
2191
2192   data = gst_queue2_locked_dequeue (queue, &is_buffer);
2193   if (data == NULL)
2194     goto no_item;
2195
2196 next:
2197   GST_QUEUE2_MUTEX_UNLOCK (queue);
2198
2199   if (is_buffer) {
2200     GstBuffer *buffer;
2201 #if 0
2202     GstCaps *caps;
2203 #endif
2204
2205     buffer = GST_BUFFER_CAST (data);
2206 #if 0
2207     caps = GST_BUFFER_CAPS (buffer);
2208 #endif
2209
2210 #if 0
2211     /* set caps before pushing the buffer so that core does not try to do
2212      * something fancy to check if this is possible. */
2213     if (caps && caps != GST_PAD_CAPS (queue->srcpad))
2214       gst_pad_set_caps (queue->srcpad, caps);
2215 #endif
2216
2217     result = gst_pad_push (queue->srcpad, buffer);
2218
2219     /* need to check for srcresult here as well */
2220     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2221     if (result == GST_FLOW_EOS) {
2222       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2223       /* stop pushing buffers, we dequeue all items until we see an item that we
2224        * can push again, which is EOS or SEGMENT. If there is nothing in the
2225        * queue we can push, we set a flag to make the sinkpad refuse more
2226        * buffers with an EOS return value until we receive something
2227        * pushable again or we get flushed. */
2228       while ((data = gst_queue2_locked_dequeue (queue, &is_buffer))) {
2229         if (is_buffer) {
2230           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2231               "dropping EOS buffer %p", data);
2232           gst_buffer_unref (GST_BUFFER_CAST (data));
2233         } else if (GST_IS_EVENT (data)) {
2234           GstEvent *event = GST_EVENT_CAST (data);
2235           GstEventType type = GST_EVENT_TYPE (event);
2236
2237           if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2238             /* we found a pushable item in the queue, push it out */
2239             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2240                 "pushing pushable event %s after EOS",
2241                 GST_EVENT_TYPE_NAME (event));
2242             goto next;
2243           }
2244           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2245               "dropping EOS event %p", event);
2246           gst_event_unref (event);
2247         }
2248       }
2249       /* no more items in the queue. Set the unexpected flag so that upstream
2250        * make us refuse any more buffers on the sinkpad. Since we will still
2251        * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2252        * task function does not shut down. */
2253       queue->unexpected = TRUE;
2254       result = GST_FLOW_OK;
2255     }
2256   } else if (GST_IS_EVENT (data)) {
2257     GstEvent *event = GST_EVENT_CAST (data);
2258     GstEventType type = GST_EVENT_TYPE (event);
2259
2260     gst_pad_push_event (queue->srcpad, event);
2261
2262     /* if we're EOS, return EOS so that the task pauses. */
2263     if (type == GST_EVENT_EOS) {
2264       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2265           "pushed EOS event %p, return EOS", event);
2266       result = GST_FLOW_EOS;
2267     }
2268
2269     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2270   }
2271   return result;
2272
2273   /* ERRORS */
2274 no_item:
2275   {
2276     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2277         "exit because we have no item in the queue");
2278     return GST_FLOW_ERROR;
2279   }
2280 out_flushing:
2281   {
2282     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2283     return GST_FLOW_WRONG_STATE;
2284   }
2285 }
2286
2287 /* called repeatedly with @pad as the source pad. This function should push out
2288  * data to the peer element. */
2289 static void
2290 gst_queue2_loop (GstPad * pad)
2291 {
2292   GstQueue2 *queue;
2293   GstFlowReturn ret;
2294
2295   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2296
2297   /* have to lock for thread-safety */
2298   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2299
2300   if (gst_queue2_is_empty (queue)) {
2301     gboolean started;
2302
2303     /* pause the timer while we wait. The fact that we are waiting does not mean
2304      * the byterate on the output pad is lower */
2305     if ((started = queue->out_timer_started))
2306       g_timer_stop (queue->out_timer);
2307
2308     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2309         "queue is empty, waiting for new data");
2310     do {
2311       /* Wait for data to be available, we could be unlocked because of a flush. */
2312       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2313     }
2314     while (gst_queue2_is_empty (queue));
2315
2316     /* and continue if we were running before */
2317     if (started)
2318       g_timer_continue (queue->out_timer);
2319   }
2320   ret = gst_queue2_push_one (queue);
2321   queue->srcresult = ret;
2322   queue->sinkresult = ret;
2323   if (ret != GST_FLOW_OK)
2324     goto out_flushing;
2325
2326   GST_QUEUE2_MUTEX_UNLOCK (queue);
2327
2328   return;
2329
2330   /* ERRORS */
2331 out_flushing:
2332   {
2333     gboolean eos = queue->is_eos;
2334     GstFlowReturn ret = queue->srcresult;
2335
2336     gst_pad_pause_task (queue->srcpad);
2337     GST_QUEUE2_MUTEX_UNLOCK (queue);
2338     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2339         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2340     /* let app know about us giving up if upstream is not expected to do so */
2341     /* EOS is already taken care of elsewhere */
2342     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
2343       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2344           (_("Internal data flow error.")),
2345           ("streaming task paused, reason %s (%d)",
2346               gst_flow_get_name (ret), ret));
2347       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2348     }
2349     return;
2350   }
2351 }
2352
2353 static gboolean
2354 gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
2355 {
2356   gboolean res = TRUE;
2357   GstQueue2 *queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2358
2359   if (G_UNLIKELY (queue == NULL)) {
2360     gst_event_unref (event);
2361     return FALSE;
2362   }
2363 #ifndef GST_DISABLE_GST_DEBUG
2364   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2365       event, GST_EVENT_TYPE_NAME (event));
2366 #endif
2367
2368   switch (GST_EVENT_TYPE (event)) {
2369     case GST_EVENT_FLUSH_START:
2370       if (QUEUE_IS_USING_QUEUE (queue)) {
2371         /* just forward upstream */
2372         res = gst_pad_push_event (queue->sinkpad, event);
2373       } else {
2374         /* now unblock the getrange function */
2375         GST_QUEUE2_MUTEX_LOCK (queue);
2376         GST_DEBUG_OBJECT (queue, "flushing");
2377         queue->srcresult = GST_FLOW_WRONG_STATE;
2378         GST_QUEUE2_SIGNAL_ADD (queue);
2379         GST_QUEUE2_MUTEX_UNLOCK (queue);
2380
2381         /* when using a temp file, we eat the event */
2382         res = TRUE;
2383         gst_event_unref (event);
2384       }
2385       break;
2386     case GST_EVENT_FLUSH_STOP:
2387       if (QUEUE_IS_USING_QUEUE (queue)) {
2388         /* just forward upstream */
2389         res = gst_pad_push_event (queue->sinkpad, event);
2390       } else {
2391         /* now unblock the getrange function */
2392         GST_QUEUE2_MUTEX_LOCK (queue);
2393         queue->srcresult = GST_FLOW_OK;
2394         if (queue->current) {
2395           /* forget the highest read offset, we'll calculate a new one when we
2396            * get the next getrange request. We need to do this in order to reset
2397            * the buffering percentage */
2398           queue->current->max_reading_pos = 0;
2399         }
2400         GST_QUEUE2_MUTEX_UNLOCK (queue);
2401
2402         /* when using a temp file, we eat the event */
2403         res = TRUE;
2404         gst_event_unref (event);
2405       }
2406       break;
2407     default:
2408       res = gst_pad_push_event (queue->sinkpad, event);
2409       break;
2410   }
2411
2412   gst_object_unref (queue);
2413   return res;
2414 }
2415
2416 static gboolean
2417 gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
2418 {
2419   GstQueue2 *queue;
2420
2421   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2422   if (G_UNLIKELY (queue == NULL))
2423     return FALSE;
2424
2425   switch (GST_QUERY_TYPE (query)) {
2426     case GST_QUERY_POSITION:
2427     {
2428       gint64 peer_pos;
2429       GstFormat format;
2430
2431       if (!gst_pad_peer_query (queue->sinkpad, query))
2432         goto peer_failed;
2433
2434       /* get peer position */
2435       gst_query_parse_position (query, &format, &peer_pos);
2436
2437       /* FIXME: this code assumes that there's no discont in the queue */
2438       switch (format) {
2439         case GST_FORMAT_BYTES:
2440           peer_pos -= queue->cur_level.bytes;
2441           break;
2442         case GST_FORMAT_TIME:
2443           peer_pos -= queue->cur_level.time;
2444           break;
2445         default:
2446           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
2447               "know how to adjust value", gst_format_get_name (format));
2448           return FALSE;
2449       }
2450       /* set updated position */
2451       gst_query_set_position (query, format, peer_pos);
2452       break;
2453     }
2454     case GST_QUERY_DURATION:
2455     {
2456       GST_DEBUG_OBJECT (queue, "doing peer query");
2457
2458       if (!gst_pad_peer_query (queue->sinkpad, query))
2459         goto peer_failed;
2460
2461       GST_DEBUG_OBJECT (queue, "peer query success");
2462       break;
2463     }
2464     case GST_QUERY_BUFFERING:
2465     {
2466       GstFormat format;
2467
2468       GST_DEBUG_OBJECT (queue, "query buffering");
2469
2470       /* FIXME - is this condition correct? what should ring buffer do? */
2471       if (QUEUE_IS_USING_QUEUE (queue)) {
2472         /* no temp file, just forward to the peer */
2473         if (!gst_pad_peer_query (queue->sinkpad, query))
2474           goto peer_failed;
2475         GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
2476       } else {
2477         gint64 start, stop, range_start, range_stop;
2478         guint64 writing_pos;
2479         gint percent;
2480         gint64 estimated_total, buffering_left;
2481         gint64 duration;
2482         gboolean peer_res, is_buffering, is_eos;
2483         gdouble byte_in_rate, byte_out_rate;
2484         GstQueue2Range *queued_ranges;
2485
2486         /* we need a current download region */
2487         if (queue->current == NULL)
2488           return FALSE;
2489
2490         writing_pos = queue->current->writing_pos;
2491         byte_in_rate = queue->byte_in_rate;
2492         byte_out_rate = queue->byte_out_rate;
2493         is_buffering = queue->is_buffering;
2494         is_eos = queue->is_eos;
2495         percent = queue->buffering_percent;
2496
2497         if (is_eos) {
2498           /* we're EOS, we know the duration in bytes now */
2499           peer_res = TRUE;
2500           duration = writing_pos;
2501         } else {
2502           /* get duration of upstream in bytes */
2503           peer_res = gst_pad_query_peer_duration (queue->sinkpad,
2504               GST_FORMAT_BYTES, &duration);
2505         }
2506
2507         /* calculate remaining and total download time */
2508         if (peer_res && byte_in_rate > 0.0) {
2509           estimated_total = (duration * 1000) / byte_in_rate;
2510           buffering_left = ((duration - writing_pos) * 1000) / byte_in_rate;
2511         } else {
2512           estimated_total = -1;
2513           buffering_left = -1;
2514         }
2515         GST_DEBUG_OBJECT (queue, "estimated %" G_GINT64_FORMAT ", left %"
2516             G_GINT64_FORMAT, estimated_total, buffering_left);
2517
2518         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
2519
2520         switch (format) {
2521           case GST_FORMAT_PERCENT:
2522             /* we need duration */
2523             if (!peer_res)
2524               goto peer_failed;
2525
2526             GST_DEBUG_OBJECT (queue,
2527                 "duration %" G_GINT64_FORMAT ", writing %" G_GINT64_FORMAT,
2528                 duration, writing_pos);
2529
2530             start = 0;
2531             /* get our available data relative to the duration */
2532             if (duration != -1)
2533               stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
2534             else
2535               stop = -1;
2536             break;
2537           case GST_FORMAT_BYTES:
2538             start = 0;
2539             stop = writing_pos;
2540             break;
2541           default:
2542             start = -1;
2543             stop = -1;
2544             break;
2545         }
2546
2547         /* fill out the buffered ranges */
2548         for (queued_ranges = queue->ranges; queued_ranges;
2549             queued_ranges = queued_ranges->next) {
2550           switch (format) {
2551             case GST_FORMAT_PERCENT:
2552               if (duration == -1) {
2553                 range_start = 0;
2554                 range_stop = 0;
2555                 break;
2556               }
2557               range_start = 100 * queued_ranges->offset / duration;
2558               range_stop = 100 * queued_ranges->writing_pos / duration;
2559               break;
2560             case GST_FORMAT_BYTES:
2561               range_start = queued_ranges->offset;
2562               range_stop = queued_ranges->writing_pos;
2563               break;
2564             default:
2565               range_start = -1;
2566               range_stop = -1;
2567               break;
2568           }
2569           if (range_start == range_stop)
2570             continue;
2571           GST_DEBUG_OBJECT (queue,
2572               "range starting at %" G_GINT64_FORMAT " and finishing at %"
2573               G_GINT64_FORMAT, range_start, range_stop);
2574           gst_query_add_buffering_range (query, range_start, range_stop);
2575         }
2576
2577         gst_query_set_buffering_percent (query, is_buffering, percent);
2578         gst_query_set_buffering_range (query, format, start, stop,
2579             estimated_total);
2580         gst_query_set_buffering_stats (query, GST_BUFFERING_DOWNLOAD,
2581             byte_in_rate, byte_out_rate, buffering_left);
2582       }
2583       break;
2584     }
2585     case GST_QUERY_SCHEDULING:
2586     {
2587       gboolean pull_mode;
2588
2589       /* we can operate in pull mode when we are using a tempfile */
2590       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
2591
2592       gst_query_set_scheduling (query, pull_mode, pull_mode, FALSE, 0, -1, 1);
2593       break;
2594     }
2595     default:
2596       /* peer handled other queries */
2597       if (!gst_pad_peer_query (queue->sinkpad, query))
2598         goto peer_failed;
2599       break;
2600   }
2601
2602   gst_object_unref (queue);
2603   return TRUE;
2604
2605   /* ERRORS */
2606 peer_failed:
2607   {
2608     GST_DEBUG_OBJECT (queue, "failed peer query");
2609     gst_object_unref (queue);
2610     return FALSE;
2611   }
2612 }
2613
2614 static gboolean
2615 gst_queue2_handle_query (GstElement * element, GstQuery * query)
2616 {
2617   /* simply forward to the srcpad query function */
2618   return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
2619 }
2620
2621 static void
2622 gst_queue2_update_upstream_size (GstQueue2 * queue)
2623 {
2624   gint64 upstream_size = -1;
2625
2626   if (gst_pad_query_peer_duration (queue->sinkpad, GST_FORMAT_BYTES,
2627           &upstream_size)) {
2628     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
2629     queue->upstream_size = upstream_size;
2630   }
2631 }
2632
2633 static GstFlowReturn
2634 gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
2635     GstBuffer ** buffer)
2636 {
2637   GstQueue2 *queue;
2638   GstFlowReturn ret;
2639
2640   queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
2641
2642   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
2643   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2644   offset = (offset == -1) ? queue->current->reading_pos : offset;
2645
2646   GST_DEBUG_OBJECT (queue,
2647       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
2648
2649   /* catch any reads beyond the size of the file here to make sure queue2
2650    * doesn't send seek events beyond the size of the file upstream, since
2651    * that would confuse elements such as souphttpsrc and/or http servers.
2652    * Demuxers often just loop until EOS at the end of the file to figure out
2653    * when they've read all the end-headers or index chunks. */
2654   if (G_UNLIKELY (offset >= queue->upstream_size)) {
2655     gst_queue2_update_upstream_size (queue);
2656     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
2657       goto out_unexpected;
2658   }
2659
2660   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
2661     gst_queue2_update_upstream_size (queue);
2662     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
2663       length = queue->upstream_size - offset;
2664       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
2665     }
2666   }
2667
2668   /* FIXME - function will block when the range is not yet available */
2669   ret = gst_queue2_create_read (queue, offset, length, buffer);
2670   GST_QUEUE2_MUTEX_UNLOCK (queue);
2671
2672   gst_object_unref (queue);
2673
2674   return ret;
2675
2676   /* ERRORS */
2677 out_flushing:
2678   {
2679     ret = queue->srcresult;
2680
2681     GST_DEBUG_OBJECT (queue, "we are flushing");
2682     GST_QUEUE2_MUTEX_UNLOCK (queue);
2683     gst_object_unref (queue);
2684     return ret;
2685   }
2686 out_unexpected:
2687   {
2688     GST_DEBUG_OBJECT (queue, "read beyond end of file");
2689     GST_QUEUE2_MUTEX_UNLOCK (queue);
2690     gst_object_unref (queue);
2691     return GST_FLOW_EOS;
2692   }
2693 }
2694
2695 /* sink currently only operates in push mode */
2696 static gboolean
2697 gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
2698 {
2699   gboolean result = TRUE;
2700   GstQueue2 *queue;
2701
2702   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2703
2704   if (active) {
2705     GST_QUEUE2_MUTEX_LOCK (queue);
2706     GST_DEBUG_OBJECT (queue, "activating push mode");
2707     queue->srcresult = GST_FLOW_OK;
2708     queue->sinkresult = GST_FLOW_OK;
2709     queue->is_eos = FALSE;
2710     queue->unexpected = FALSE;
2711     reset_rate_timer (queue);
2712     GST_QUEUE2_MUTEX_UNLOCK (queue);
2713   } else {
2714     /* unblock chain function */
2715     GST_QUEUE2_MUTEX_LOCK (queue);
2716     GST_DEBUG_OBJECT (queue, "deactivating push mode");
2717     queue->srcresult = GST_FLOW_WRONG_STATE;
2718     queue->sinkresult = GST_FLOW_WRONG_STATE;
2719     gst_queue2_locked_flush (queue);
2720     GST_QUEUE2_MUTEX_UNLOCK (queue);
2721   }
2722
2723   gst_object_unref (queue);
2724
2725   return result;
2726 }
2727
2728 /* src operating in push mode, we start a task on the source pad that pushes out
2729  * buffers from the queue */
2730 static gboolean
2731 gst_queue2_src_activate_push (GstPad * pad, gboolean active)
2732 {
2733   gboolean result = FALSE;
2734   GstQueue2 *queue;
2735
2736   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2737
2738   if (active) {
2739     GST_QUEUE2_MUTEX_LOCK (queue);
2740     GST_DEBUG_OBJECT (queue, "activating push mode");
2741     queue->srcresult = GST_FLOW_OK;
2742     queue->sinkresult = GST_FLOW_OK;
2743     queue->is_eos = FALSE;
2744     queue->unexpected = FALSE;
2745     result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
2746     GST_QUEUE2_MUTEX_UNLOCK (queue);
2747   } else {
2748     /* unblock loop function */
2749     GST_QUEUE2_MUTEX_LOCK (queue);
2750     GST_DEBUG_OBJECT (queue, "deactivating push mode");
2751     queue->srcresult = GST_FLOW_WRONG_STATE;
2752     queue->sinkresult = GST_FLOW_WRONG_STATE;
2753     /* the item add signal will unblock */
2754     GST_QUEUE2_SIGNAL_ADD (queue);
2755     GST_QUEUE2_MUTEX_UNLOCK (queue);
2756
2757     /* step 2, make sure streaming finishes */
2758     result = gst_pad_stop_task (pad);
2759   }
2760
2761   gst_object_unref (queue);
2762
2763   return result;
2764 }
2765
2766 /* pull mode, downstream will call our getrange function */
2767 static gboolean
2768 gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
2769 {
2770   gboolean result;
2771   GstQueue2 *queue;
2772
2773   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2774
2775   if (active) {
2776     GST_QUEUE2_MUTEX_LOCK (queue);
2777     if (!QUEUE_IS_USING_QUEUE (queue)) {
2778       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2779         /* open the temp file now */
2780         result = gst_queue2_open_temp_location_file (queue);
2781       } else if (!queue->ring_buffer) {
2782         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
2783         result = ! !queue->ring_buffer;
2784       } else {
2785         result = TRUE;
2786       }
2787
2788       GST_DEBUG_OBJECT (queue, "activating pull mode");
2789       init_ranges (queue);
2790       queue->srcresult = GST_FLOW_OK;
2791       queue->sinkresult = GST_FLOW_OK;
2792       queue->is_eos = FALSE;
2793       queue->unexpected = FALSE;
2794       queue->upstream_size = 0;
2795     } else {
2796       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
2797       /* this is not allowed, we cannot operate in pull mode without a temp
2798        * file. */
2799       queue->srcresult = GST_FLOW_WRONG_STATE;
2800       queue->sinkresult = GST_FLOW_WRONG_STATE;
2801       result = FALSE;
2802     }
2803     GST_QUEUE2_MUTEX_UNLOCK (queue);
2804   } else {
2805     GST_QUEUE2_MUTEX_LOCK (queue);
2806     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
2807     queue->srcresult = GST_FLOW_WRONG_STATE;
2808     queue->sinkresult = GST_FLOW_WRONG_STATE;
2809     /* this will unlock getrange */
2810     GST_QUEUE2_SIGNAL_ADD (queue);
2811     result = TRUE;
2812     GST_QUEUE2_MUTEX_UNLOCK (queue);
2813   }
2814   gst_object_unref (queue);
2815
2816   return result;
2817 }
2818
2819 static GstStateChangeReturn
2820 gst_queue2_change_state (GstElement * element, GstStateChange transition)
2821 {
2822   GstQueue2 *queue;
2823   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
2824
2825   queue = GST_QUEUE2 (element);
2826
2827   switch (transition) {
2828     case GST_STATE_CHANGE_NULL_TO_READY:
2829       break;
2830     case GST_STATE_CHANGE_READY_TO_PAUSED:
2831       GST_QUEUE2_MUTEX_LOCK (queue);
2832       if (!QUEUE_IS_USING_QUEUE (queue)) {
2833         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2834           if (!gst_queue2_open_temp_location_file (queue))
2835             ret = GST_STATE_CHANGE_FAILURE;
2836         } else {
2837           if (queue->ring_buffer) {
2838             g_free (queue->ring_buffer);
2839             queue->ring_buffer = NULL;
2840           }
2841           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
2842             ret = GST_STATE_CHANGE_FAILURE;
2843         }
2844         init_ranges (queue);
2845       }
2846       queue->segment_event_received = FALSE;
2847       queue->starting_segment = NULL;
2848       GST_QUEUE2_MUTEX_UNLOCK (queue);
2849       break;
2850     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2851       break;
2852     default:
2853       break;
2854   }
2855
2856   if (ret == GST_STATE_CHANGE_FAILURE)
2857     return ret;
2858
2859   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2860
2861   if (ret == GST_STATE_CHANGE_FAILURE)
2862     return ret;
2863
2864   switch (transition) {
2865     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2866       break;
2867     case GST_STATE_CHANGE_PAUSED_TO_READY:
2868       GST_QUEUE2_MUTEX_LOCK (queue);
2869       if (!QUEUE_IS_USING_QUEUE (queue)) {
2870         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2871           gst_queue2_close_temp_location_file (queue);
2872         } else if (queue->ring_buffer) {
2873           g_free (queue->ring_buffer);
2874           queue->ring_buffer = NULL;
2875         }
2876         clean_ranges (queue);
2877       }
2878       if (queue->starting_segment != NULL) {
2879         gst_event_unref (queue->starting_segment);
2880         queue->starting_segment = NULL;
2881       }
2882       GST_QUEUE2_MUTEX_UNLOCK (queue);
2883       break;
2884     case GST_STATE_CHANGE_READY_TO_NULL:
2885       break;
2886     default:
2887       break;
2888   }
2889
2890   return ret;
2891 }
2892
2893 /* changing the capacity of the queue must wake up
2894  * the _chain function, it might have more room now
2895  * to store the buffer/event in the queue */
2896 #define QUEUE_CAPACITY_CHANGE(q)\
2897   GST_QUEUE2_SIGNAL_DEL (queue);
2898
2899 /* Changing the minimum required fill level must
2900  * wake up the _loop function as it might now
2901  * be able to preceed.
2902  */
2903 #define QUEUE_THRESHOLD_CHANGE(q)\
2904   GST_QUEUE2_SIGNAL_ADD (queue);
2905
2906 static void
2907 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
2908 {
2909   GstState state;
2910
2911   /* the element must be stopped in order to do this */
2912   GST_OBJECT_LOCK (queue);
2913   state = GST_STATE (queue);
2914   if (state != GST_STATE_READY && state != GST_STATE_NULL)
2915     goto wrong_state;
2916   GST_OBJECT_UNLOCK (queue);
2917
2918   /* set new location */
2919   g_free (queue->temp_template);
2920   queue->temp_template = g_strdup (template);
2921
2922   return;
2923
2924 /* ERROR */
2925 wrong_state:
2926   {
2927     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
2928     GST_OBJECT_UNLOCK (queue);
2929   }
2930 }
2931
2932 static void
2933 gst_queue2_set_property (GObject * object,
2934     guint prop_id, const GValue * value, GParamSpec * pspec)
2935 {
2936   GstQueue2 *queue = GST_QUEUE2 (object);
2937
2938   /* someone could change levels here, and since this
2939    * affects the get/put funcs, we need to lock for safety. */
2940   GST_QUEUE2_MUTEX_LOCK (queue);
2941
2942   switch (prop_id) {
2943     case PROP_MAX_SIZE_BYTES:
2944       queue->max_level.bytes = g_value_get_uint (value);
2945       QUEUE_CAPACITY_CHANGE (queue);
2946       break;
2947     case PROP_MAX_SIZE_BUFFERS:
2948       queue->max_level.buffers = g_value_get_uint (value);
2949       QUEUE_CAPACITY_CHANGE (queue);
2950       break;
2951     case PROP_MAX_SIZE_TIME:
2952       queue->max_level.time = g_value_get_uint64 (value);
2953       /* set rate_time to the same value. We use an extra field in the level
2954        * structure so that we can easily access and compare it */
2955       queue->max_level.rate_time = queue->max_level.time;
2956       QUEUE_CAPACITY_CHANGE (queue);
2957       break;
2958     case PROP_USE_BUFFERING:
2959       queue->use_buffering = g_value_get_boolean (value);
2960       break;
2961     case PROP_USE_RATE_ESTIMATE:
2962       queue->use_rate_estimate = g_value_get_boolean (value);
2963       break;
2964     case PROP_LOW_PERCENT:
2965       queue->low_percent = g_value_get_int (value);
2966       break;
2967     case PROP_HIGH_PERCENT:
2968       queue->high_percent = g_value_get_int (value);
2969       break;
2970     case PROP_TEMP_TEMPLATE:
2971       gst_queue2_set_temp_template (queue, g_value_get_string (value));
2972       break;
2973     case PROP_TEMP_LOCATION:
2974       g_free (queue->temp_location);
2975       queue->temp_location = g_value_dup_string (value);
2976       /* you can set the property back to NULL to make it use the temp-tmpl
2977        * property. */
2978       queue->temp_location_set = queue->temp_location != NULL;
2979       break;
2980     case PROP_TEMP_REMOVE:
2981       queue->temp_remove = g_value_get_boolean (value);
2982       break;
2983     case PROP_RING_BUFFER_MAX_SIZE:
2984       queue->ring_buffer_max_size = g_value_get_uint64 (value);
2985       break;
2986     default:
2987       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2988       break;
2989   }
2990
2991   GST_QUEUE2_MUTEX_UNLOCK (queue);
2992 }
2993
2994 static void
2995 gst_queue2_get_property (GObject * object,
2996     guint prop_id, GValue * value, GParamSpec * pspec)
2997 {
2998   GstQueue2 *queue = GST_QUEUE2 (object);
2999
3000   GST_QUEUE2_MUTEX_LOCK (queue);
3001
3002   switch (prop_id) {
3003     case PROP_CUR_LEVEL_BYTES:
3004       g_value_set_uint (value, queue->cur_level.bytes);
3005       break;
3006     case PROP_CUR_LEVEL_BUFFERS:
3007       g_value_set_uint (value, queue->cur_level.buffers);
3008       break;
3009     case PROP_CUR_LEVEL_TIME:
3010       g_value_set_uint64 (value, queue->cur_level.time);
3011       break;
3012     case PROP_MAX_SIZE_BYTES:
3013       g_value_set_uint (value, queue->max_level.bytes);
3014       break;
3015     case PROP_MAX_SIZE_BUFFERS:
3016       g_value_set_uint (value, queue->max_level.buffers);
3017       break;
3018     case PROP_MAX_SIZE_TIME:
3019       g_value_set_uint64 (value, queue->max_level.time);
3020       break;
3021     case PROP_USE_BUFFERING:
3022       g_value_set_boolean (value, queue->use_buffering);
3023       break;
3024     case PROP_USE_RATE_ESTIMATE:
3025       g_value_set_boolean (value, queue->use_rate_estimate);
3026       break;
3027     case PROP_LOW_PERCENT:
3028       g_value_set_int (value, queue->low_percent);
3029       break;
3030     case PROP_HIGH_PERCENT:
3031       g_value_set_int (value, queue->high_percent);
3032       break;
3033     case PROP_TEMP_TEMPLATE:
3034       g_value_set_string (value, queue->temp_template);
3035       break;
3036     case PROP_TEMP_LOCATION:
3037       g_value_set_string (value, queue->temp_location);
3038       break;
3039     case PROP_TEMP_REMOVE:
3040       g_value_set_boolean (value, queue->temp_remove);
3041       break;
3042     case PROP_RING_BUFFER_MAX_SIZE:
3043       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3044       break;
3045     default:
3046       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3047       break;
3048   }
3049
3050   GST_QUEUE2_MUTEX_UNLOCK (queue);
3051 }