queue2: Add higher-resolution low/high-watermark properties
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 #ifdef __BIONIC__               /* Android */
77 #undef lseek
78 #define lseek lseek64
79 #undef off_t
80 #define off_t guint64
81 #include <fcntl.h>
82 #endif
83
84 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
85     GST_PAD_SINK,
86     GST_PAD_ALWAYS,
87     GST_STATIC_CAPS_ANY);
88
89 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
90     GST_PAD_SRC,
91     GST_PAD_ALWAYS,
92     GST_STATIC_CAPS_ANY);
93
94 GST_DEBUG_CATEGORY_STATIC (queue_debug);
95 #define GST_CAT_DEFAULT (queue_debug)
96 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
97
98 enum
99 {
100   LAST_SIGNAL
101 };
102
103 /* other defines */
104 #define DEFAULT_BUFFER_SIZE 4096
105 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
106 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
107 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
108
109 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
110
111 /* default property values */
112 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
113 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
114 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
115 #define DEFAULT_USE_BUFFERING      FALSE
116 #define DEFAULT_USE_TAGS_BITRATE   FALSE
117 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
118 #define DEFAULT_LOW_PERCENT        10
119 #define DEFAULT_HIGH_PERCENT       99
120 #define DEFAULT_LOW_WATERMARK      0.01
121 #define DEFAULT_HIGH_WATERMARK     0.99
122 #define DEFAULT_TEMP_REMOVE        TRUE
123 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
124
125 enum
126 {
127   PROP_0,
128   PROP_CUR_LEVEL_BUFFERS,
129   PROP_CUR_LEVEL_BYTES,
130   PROP_CUR_LEVEL_TIME,
131   PROP_MAX_SIZE_BUFFERS,
132   PROP_MAX_SIZE_BYTES,
133   PROP_MAX_SIZE_TIME,
134   PROP_USE_BUFFERING,
135   PROP_USE_TAGS_BITRATE,
136   PROP_USE_RATE_ESTIMATE,
137   PROP_LOW_PERCENT,
138   PROP_HIGH_PERCENT,
139   PROP_LOW_WATERMARK,
140   PROP_HIGH_WATERMARK,
141   PROP_TEMP_TEMPLATE,
142   PROP_TEMP_LOCATION,
143   PROP_TEMP_REMOVE,
144   PROP_RING_BUFFER_MAX_SIZE,
145   PROP_AVG_IN_RATE,
146   PROP_LAST
147 };
148
149 /* Explanation for buffer levels and percentages:
150  *
151  * The buffering_level functions here return a value in a normalized range
152  * that specifies the queue's current fill level. The range goes from 0 to
153  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
154  *
155  * This is not to be confused with the buffering_percent value, which is
156  * a *relative* quantity - relative to the low/high watermarks.
157  * buffering_percent = 0% means buffering_level is at the low watermark.
158  * buffering_percent = 100% means buffering_level is at the high watermark.
159  * buffering_percent is used for determining if the fill level has reached
160  * the high watermark, and for producing BUFFERING messages. This value
161  * always uses a 0..100 range (since it is a percentage).
162  *
163  * To avoid future confusions, whenever "buffering level" is mentioned, it
164  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
165  * range. Whenever "buffering_percent" is mentioned, it refers to the
166  * percentage value that is relative to the low/high watermark. */
167
168 /* Using a buffering level range of 0..1000000 to allow for a
169  * resolution in ppm (1 ppm = 0.0001%) */
170 #define MAX_BUFFERING_LEVEL 1000000
171
172 /* How much 1% makes up in the buffer level range */
173 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
174
175 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
176   l.buffers = 0;                                        \
177   l.bytes = 0;                                          \
178   l.time = 0;                                           \
179   l.rate_time = 0;                                      \
180 } G_STMT_END
181
182 #define STATUS(queue, pad, msg) \
183   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
184                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
185                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
186                       " ns, %"G_GUINT64_FORMAT" items", \
187                       GST_DEBUG_PAD_NAME (pad), \
188                       queue->cur_level.buffers, \
189                       queue->max_level.buffers, \
190                       queue->cur_level.bytes, \
191                       queue->max_level.bytes, \
192                       queue->cur_level.time, \
193                       queue->max_level.time, \
194                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
195                         queue->current->writing_pos - queue->current->max_reading_pos : \
196                         queue->queue.length))
197
198 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
199   g_mutex_lock (&q->qlock);                                              \
200 } G_STMT_END
201
202 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
203   GST_QUEUE2_MUTEX_LOCK (q);                                            \
204   if (res != GST_FLOW_OK)                                               \
205     goto label;                                                         \
206 } G_STMT_END
207
208 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
209   g_mutex_unlock (&q->qlock);                                            \
210 } G_STMT_END
211
212 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
213   STATUS (queue, q->sinkpad, "wait for DEL");                           \
214   q->waiting_del = TRUE;                                                \
215   g_cond_wait (&q->item_del, &queue->qlock);                              \
216   q->waiting_del = FALSE;                                               \
217   if (res != GST_FLOW_OK) {                                             \
218     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
219     goto label;                                                         \
220   }                                                                     \
221   STATUS (queue, q->sinkpad, "received DEL");                           \
222 } G_STMT_END
223
224 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
225   STATUS (queue, q->srcpad, "wait for ADD");                            \
226   q->waiting_add = TRUE;                                                \
227   g_cond_wait (&q->item_add, &q->qlock);                                  \
228   q->waiting_add = FALSE;                                               \
229   if (res != GST_FLOW_OK) {                                             \
230     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
231     goto label;                                                         \
232   }                                                                     \
233   STATUS (queue, q->srcpad, "received ADD");                            \
234 } G_STMT_END
235
236 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
237   if (q->waiting_del) {                                                 \
238     STATUS (q, q->srcpad, "signal DEL");                                \
239     g_cond_signal (&q->item_del);                                        \
240   }                                                                     \
241 } G_STMT_END
242
243 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
244   if (q->waiting_add) {                                                 \
245     STATUS (q, q->sinkpad, "signal ADD");                               \
246     g_cond_signal (&q->item_add);                                        \
247   }                                                                     \
248 } G_STMT_END
249
250 #define SET_PERCENT(q, perc) G_STMT_START {                              \
251   if (perc != q->buffering_percent) {                                    \
252     q->buffering_percent = perc;                                         \
253     q->percent_changed = TRUE;                                           \
254     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
255     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
256         &q->buffering_left);                                             \
257   }                                                                      \
258 } G_STMT_END
259
260 #define _do_init \
261     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
262     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
263         "dataflow inside the queue element");
264 #define gst_queue2_parent_class parent_class
265 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
266
267 static void gst_queue2_finalize (GObject * object);
268
269 static void gst_queue2_set_property (GObject * object,
270     guint prop_id, const GValue * value, GParamSpec * pspec);
271 static void gst_queue2_get_property (GObject * object,
272     guint prop_id, GValue * value, GParamSpec * pspec);
273
274 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
275     GstBuffer * buffer);
276 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
277     GstBufferList * buffer_list);
278 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
279 static void gst_queue2_loop (GstPad * pad);
280
281 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
282     GstEvent * event);
283 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
284     GstQuery * query);
285
286 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
287     GstEvent * event);
288 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
289     GstQuery * query);
290 static gboolean gst_queue2_handle_query (GstElement * element,
291     GstQuery * query);
292
293 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
294     guint64 offset, guint length, GstBuffer ** buffer);
295
296 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
297     GstPadMode mode, gboolean active);
298 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
299     GstPadMode mode, gboolean active);
300 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
301     GstStateChange transition);
302
303 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
304 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
305
306 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
307 static void update_in_rates (GstQueue2 * queue, gboolean force);
308 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
309 static void gst_queue2_post_buffering (GstQueue2 * queue);
310
311 typedef enum
312 {
313   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
314   GST_QUEUE2_ITEM_TYPE_BUFFER,
315   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
316   GST_QUEUE2_ITEM_TYPE_EVENT,
317   GST_QUEUE2_ITEM_TYPE_QUERY
318 } GstQueue2ItemType;
319
320 typedef struct
321 {
322   GstQueue2ItemType type;
323   GstMiniObject *item;
324 } GstQueue2Item;
325
326 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
327
328 static void
329 gst_queue2_class_init (GstQueue2Class * klass)
330 {
331   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
332   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
333
334   gobject_class->set_property = gst_queue2_set_property;
335   gobject_class->get_property = gst_queue2_get_property;
336
337   /* properties */
338   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
339       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
340           "Current amount of data in the queue (bytes)",
341           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
343       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
344           "Current number of buffers in the queue",
345           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
346   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
347       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
348           "Current amount of data in the queue (in ns)",
349           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
350
351   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
352       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
353           "Max. amount of data in the queue (bytes, 0=disable)",
354           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
355           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
358       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
359           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
360           DEFAULT_MAX_SIZE_BUFFERS,
361           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
362           G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
364       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
365           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
366           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
367           G_PARAM_STATIC_STRINGS));
368
369   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
370       g_param_spec_boolean ("use-buffering", "Use buffering",
371           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
372           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
373           G_PARAM_STATIC_STRINGS));
374   g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
375       g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
376           "Use a bitrate from upstream tags to estimate buffer duration if not provided",
377           DEFAULT_USE_TAGS_BITRATE,
378           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
379           G_PARAM_STATIC_STRINGS));
380   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
381       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
382           "Estimate the bitrate of the stream to calculate time level",
383           DEFAULT_USE_RATE_ESTIMATE,
384           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
385   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
386       g_param_spec_int ("low-percent", "Low percent",
387           "Low threshold for buffering to start. Only used if use-buffering is True "
388           "(Deprecated: use low-watermark instead)",
389           0, 100, DEFAULT_LOW_WATERMARK * 100,
390           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
391   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
392       g_param_spec_int ("high-percent", "High percent",
393           "High threshold for buffering to finish. Only used if use-buffering is True "
394           "(Deprecated: use high-watermark instead)",
395           0, 100, DEFAULT_HIGH_WATERMARK * 100,
396           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
397   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
398       g_param_spec_double ("low-watermark", "Low watermark",
399           "Low threshold for buffering to start. Only used if use-buffering is True",
400           0.0, 1.0, DEFAULT_LOW_WATERMARK,
401           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
402   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
403       g_param_spec_double ("high-watermark", "High watermark",
404           "High threshold for buffering to finish. Only used if use-buffering is True",
405           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
406           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407
408   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
409       g_param_spec_string ("temp-template", "Temporary File Template",
410           "File template to store temporary files in, should contain directory "
411           "and XXXXXX. (NULL == disabled)",
412           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413
414   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
415       g_param_spec_string ("temp-location", "Temporary File Location",
416           "Location to store temporary files in (Only read this property, "
417           "use temp-template to configure the name template)",
418           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
419
420   /**
421    * GstQueue2:temp-remove
422    *
423    * When temp-template is set, remove the temporary file when going to READY.
424    */
425   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
426       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
427           "Remove the temp-location after use",
428           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
429
430   /**
431    * GstQueue2:ring-buffer-max-size
432    *
433    * The maximum size of the ring buffer in bytes. If set to 0, the ring
434    * buffer is disabled. Default 0.
435    */
436   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
437       g_param_spec_uint64 ("ring-buffer-max-size",
438           "Max. ring buffer size (bytes)",
439           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
440           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
441           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442
443   /**
444    * GstQueue2:avg-in-rate
445    *
446    * The average input data rate.
447    */
448   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
449       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
450           "Average input data rate (bytes/s)",
451           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
452
453   /* set several parent class virtual functions */
454   gobject_class->finalize = gst_queue2_finalize;
455
456   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
457   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
458
459   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
460       "Generic",
461       "Simple data queue",
462       "Erik Walthinsen <omega@cse.ogi.edu>, "
463       "Wim Taymans <wim.taymans@gmail.com>");
464
465   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
466   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
467 }
468
469 static void
470 gst_queue2_init (GstQueue2 * queue)
471 {
472   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
473
474   gst_pad_set_chain_function (queue->sinkpad,
475       GST_DEBUG_FUNCPTR (gst_queue2_chain));
476   gst_pad_set_chain_list_function (queue->sinkpad,
477       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
478   gst_pad_set_activatemode_function (queue->sinkpad,
479       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
480   gst_pad_set_event_function (queue->sinkpad,
481       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
482   gst_pad_set_query_function (queue->sinkpad,
483       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
484   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
485   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
486
487   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
488
489   gst_pad_set_activatemode_function (queue->srcpad,
490       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
491   gst_pad_set_getrange_function (queue->srcpad,
492       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
493   gst_pad_set_event_function (queue->srcpad,
494       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
495   gst_pad_set_query_function (queue->srcpad,
496       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
497   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
498   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
499
500   /* levels */
501   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
502   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
503   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
504   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
505   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
506   queue->use_buffering = DEFAULT_USE_BUFFERING;
507   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
508   queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
509   queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
510
511   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
512   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
513
514   queue->sinktime = GST_CLOCK_TIME_NONE;
515   queue->srctime = GST_CLOCK_TIME_NONE;
516   queue->sink_tainted = TRUE;
517   queue->src_tainted = TRUE;
518
519   queue->srcresult = GST_FLOW_FLUSHING;
520   queue->sinkresult = GST_FLOW_FLUSHING;
521   queue->is_eos = FALSE;
522   queue->in_timer = g_timer_new ();
523   queue->out_timer = g_timer_new ();
524
525   g_mutex_init (&queue->qlock);
526   queue->waiting_add = FALSE;
527   g_cond_init (&queue->item_add);
528   queue->waiting_del = FALSE;
529   g_cond_init (&queue->item_del);
530   g_queue_init (&queue->queue);
531
532   g_cond_init (&queue->query_handled);
533   queue->last_query = FALSE;
534
535   g_mutex_init (&queue->buffering_post_lock);
536   queue->buffering_percent = 100;
537
538   /* tempfile related */
539   queue->temp_template = NULL;
540   queue->temp_location = NULL;
541   queue->temp_remove = DEFAULT_TEMP_REMOVE;
542
543   queue->ring_buffer = NULL;
544   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
545
546   GST_DEBUG_OBJECT (queue,
547       "initialized queue's not_empty & not_full conditions");
548 }
549
550 /* called only once, as opposed to dispose */
551 static void
552 gst_queue2_finalize (GObject * object)
553 {
554   GstQueue2 *queue = GST_QUEUE2 (object);
555
556   GST_DEBUG_OBJECT (queue, "finalizing queue");
557
558   while (!g_queue_is_empty (&queue->queue)) {
559     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
560
561     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
562       gst_mini_object_unref (qitem->item);
563     g_slice_free (GstQueue2Item, qitem);
564   }
565
566   queue->last_query = FALSE;
567   g_queue_clear (&queue->queue);
568   g_mutex_clear (&queue->qlock);
569   g_mutex_clear (&queue->buffering_post_lock);
570   g_cond_clear (&queue->item_add);
571   g_cond_clear (&queue->item_del);
572   g_cond_clear (&queue->query_handled);
573   g_timer_destroy (queue->in_timer);
574   g_timer_destroy (queue->out_timer);
575
576   /* temp_file path cleanup  */
577   g_free (queue->temp_template);
578   g_free (queue->temp_location);
579
580   G_OBJECT_CLASS (parent_class)->finalize (object);
581 }
582
583 static void
584 debug_ranges (GstQueue2 * queue)
585 {
586   GstQueue2Range *walk;
587
588   for (walk = queue->ranges; walk; walk = walk->next) {
589     GST_DEBUG_OBJECT (queue,
590         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
591         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
592         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
593         walk->rb_writing_pos, walk->reading_pos,
594         walk == queue->current ? "**y**" : "  n  ");
595   }
596 }
597
598 /* clear all the downloaded ranges */
599 static void
600 clean_ranges (GstQueue2 * queue)
601 {
602   GST_DEBUG_OBJECT (queue, "clean queue ranges");
603
604   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
605   queue->ranges = NULL;
606   queue->current = NULL;
607 }
608
609 /* find a range that contains @offset or NULL when nothing does */
610 static GstQueue2Range *
611 find_range (GstQueue2 * queue, guint64 offset)
612 {
613   GstQueue2Range *range = NULL;
614   GstQueue2Range *walk;
615
616   /* first do a quick check for the current range */
617   for (walk = queue->ranges; walk; walk = walk->next) {
618     if (offset >= walk->offset && offset <= walk->writing_pos) {
619       /* we can reuse an existing range */
620       range = walk;
621       break;
622     }
623   }
624   if (range) {
625     GST_DEBUG_OBJECT (queue,
626         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
627         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
628   } else {
629     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
630   }
631   return range;
632 }
633
634 static void
635 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
636 {
637   guint64 max_reading_pos, writing_pos;
638
639   writing_pos = range->writing_pos;
640   max_reading_pos = range->max_reading_pos;
641
642   if (writing_pos > max_reading_pos)
643     queue->cur_level.bytes = writing_pos - max_reading_pos;
644   else
645     queue->cur_level.bytes = 0;
646 }
647
648 /* make a new range for @offset or reuse an existing range */
649 static GstQueue2Range *
650 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
651 {
652   GstQueue2Range *range, *prev, *next;
653
654   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
655
656   if ((range = find_range (queue, offset))) {
657     GST_DEBUG_OBJECT (queue,
658         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
659         range->writing_pos);
660     if (update_existing && range->writing_pos != offset) {
661       GST_DEBUG_OBJECT (queue, "updating range writing position to "
662           "%" G_GUINT64_FORMAT, offset);
663       range->writing_pos = offset;
664     }
665   } else {
666     GST_DEBUG_OBJECT (queue,
667         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
668
669     range = g_slice_new0 (GstQueue2Range);
670     range->offset = offset;
671     /* we want to write to the next location in the ring buffer */
672     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
673     range->writing_pos = offset;
674     range->rb_writing_pos = range->rb_offset;
675     range->reading_pos = offset;
676     range->max_reading_pos = offset;
677
678     /* insert sorted */
679     prev = NULL;
680     next = queue->ranges;
681     while (next) {
682       if (next->offset > offset) {
683         /* insert before next */
684         GST_DEBUG_OBJECT (queue,
685             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
686             next->offset);
687         break;
688       }
689       /* try next */
690       prev = next;
691       next = next->next;
692     }
693     range->next = next;
694     if (prev)
695       prev->next = range;
696     else
697       queue->ranges = range;
698   }
699   debug_ranges (queue);
700
701   /* update the stats for this range */
702   update_cur_level (queue, range);
703
704   return range;
705 }
706
707
708 /* clear and init the download ranges for offset 0 */
709 static void
710 init_ranges (GstQueue2 * queue)
711 {
712   GST_DEBUG_OBJECT (queue, "init queue ranges");
713
714   /* get rid of all the current ranges */
715   clean_ranges (queue);
716   /* make a range for offset 0 */
717   queue->current = add_range (queue, 0, TRUE);
718 }
719
720 /* calculate the diff between running time on the sink and src of the queue.
721  * This is the total amount of time in the queue. */
722 static void
723 update_time_level (GstQueue2 * queue)
724 {
725   if (queue->sink_tainted) {
726     queue->sinktime =
727         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
728         queue->sink_segment.position);
729     queue->sink_tainted = FALSE;
730   }
731
732   if (queue->src_tainted) {
733     queue->srctime =
734         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
735         queue->src_segment.position);
736     queue->src_tainted = FALSE;
737   }
738
739   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
740       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
741
742   if (queue->sinktime != GST_CLOCK_TIME_NONE
743       && queue->srctime != GST_CLOCK_TIME_NONE
744       && queue->sinktime >= queue->srctime)
745     queue->cur_level.time = queue->sinktime - queue->srctime;
746   else
747     queue->cur_level.time = 0;
748 }
749
750 /* take a SEGMENT event and apply the values to segment, updating the time
751  * level of queue. */
752 static void
753 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
754     gboolean is_sink)
755 {
756   gst_event_copy_segment (event, segment);
757
758   if (segment->format == GST_FORMAT_BYTES) {
759     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
760       /* start is where we'll be getting from and as such writing next */
761       queue->current = add_range (queue, segment->start, TRUE);
762     }
763   }
764
765   /* now configure the values, we use these to track timestamps on the
766    * sinkpad. */
767   if (segment->format != GST_FORMAT_TIME) {
768     /* non-time format, pretend the current time segment is closed with a
769      * 0 start and unknown stop time. */
770     segment->format = GST_FORMAT_TIME;
771     segment->start = 0;
772     segment->stop = -1;
773     segment->time = 0;
774   }
775
776   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
777
778   if (is_sink)
779     queue->sink_tainted = TRUE;
780   else
781     queue->src_tainted = TRUE;
782
783   /* segment can update the time level of the queue */
784   update_time_level (queue);
785 }
786
787 static void
788 apply_gap (GstQueue2 * queue, GstEvent * event,
789     GstSegment * segment, gboolean is_sink)
790 {
791   GstClockTime timestamp;
792   GstClockTime duration;
793
794   gst_event_parse_gap (event, &timestamp, &duration);
795
796   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
797
798     if (GST_CLOCK_TIME_IS_VALID (duration)) {
799       timestamp += duration;
800     }
801
802     segment->position = timestamp;
803
804     if (is_sink)
805       queue->sink_tainted = TRUE;
806     else
807       queue->src_tainted = TRUE;
808
809     /* calc diff with other end */
810     update_time_level (queue);
811   }
812 }
813
814 /* take a buffer and update segment, updating the time level of the queue. */
815 static void
816 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
817     guint64 size, gboolean is_sink)
818 {
819   GstClockTime duration, timestamp;
820
821   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
822   duration = GST_BUFFER_DURATION (buffer);
823
824   /* If we have no duration, pick one from the bitrate if we can */
825   if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
826     guint bitrate =
827         is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
828     if (bitrate)
829       duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
830   }
831
832   /* if no timestamp is set, assume it's continuous with the previous
833    * time */
834   if (timestamp == GST_CLOCK_TIME_NONE)
835     timestamp = segment->position;
836
837   /* add duration */
838   if (duration != GST_CLOCK_TIME_NONE)
839     timestamp += duration;
840
841   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
842       GST_TIME_ARGS (timestamp));
843
844   segment->position = timestamp;
845
846   if (is_sink)
847     queue->sink_tainted = TRUE;
848   else
849     queue->src_tainted = TRUE;
850
851   /* calc diff with other end */
852   update_time_level (queue);
853 }
854
855 struct BufListData
856 {
857   GstClockTime timestamp;
858   guint bitrate;
859 };
860
861 static gboolean
862 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
863 {
864   struct BufListData *bld = data;
865   GstClockTime *timestamp = &bld->timestamp;
866   GstClockTime btime;
867
868   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
869       " duration %" GST_TIME_FORMAT, idx,
870       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
871       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
872       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
873
874   btime = GST_BUFFER_DTS_OR_PTS (*buf);
875   if (GST_CLOCK_TIME_IS_VALID (btime))
876     *timestamp = btime;
877
878   if (GST_BUFFER_DURATION_IS_VALID (*buf))
879     *timestamp += GST_BUFFER_DURATION (*buf);
880   else if (bld->bitrate != 0) {
881     guint64 size = gst_buffer_get_size (*buf);
882
883     /* If we have no duration, pick one from the bitrate if we can */
884     *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
885   }
886
887
888   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
889   return TRUE;
890 }
891
892 /* take a buffer list and update segment, updating the time level of the queue */
893 static void
894 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
895     GstSegment * segment, gboolean is_sink)
896 {
897   struct BufListData bld;
898
899   /* if no timestamp is set, assume it's continuous with the previous time */
900   bld.timestamp = segment->position;
901
902   if (queue->use_tags_bitrate) {
903     if (is_sink)
904       bld.bitrate = queue->sink_tags_bitrate;
905     else
906       bld.bitrate = queue->src_tags_bitrate;
907   } else
908     bld.bitrate = 0;
909
910   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
911
912   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
913       GST_TIME_ARGS (bld.timestamp));
914
915   segment->position = bld.timestamp;
916
917   if (is_sink)
918     queue->sink_tainted = TRUE;
919   else
920     queue->src_tainted = TRUE;
921
922   /* calc diff with other end */
923   update_time_level (queue);
924 }
925
926 static inline gint
927 normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
928     guint64 alt_max)
929 {
930   guint64 p;
931
932   if (max_level == 0)
933     return 0;
934
935   if (alt_max > 0)
936     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
937         MIN (max_level, alt_max));
938   else
939     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
940
941   return MIN (p, MAX_BUFFERING_LEVEL);
942 }
943
944 static gboolean
945 get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
946     gint * buffering_level)
947 {
948   gint buflevel, buflevel2;
949
950   if (queue->high_watermark <= 0) {
951     if (buffering_level)
952       *buffering_level = MAX_BUFFERING_LEVEL;
953     if (is_buffering)
954       *is_buffering = FALSE;
955     return FALSE;
956   }
957 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
958     normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
959
960   if (queue->is_eos) {
961     /* on EOS we are always 100% full, we set the var here so that it we can
962      * reuse the logic below to stop buffering */
963     buflevel = MAX_BUFFERING_LEVEL;
964     GST_LOG_OBJECT (queue, "we are EOS");
965   } else {
966     GST_LOG_OBJECT (queue,
967         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
968         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
969         queue->cur_level.buffers);
970
971     /* figure out the buffering level we are filled, we take the max of all formats. */
972     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
973       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
974     } else {
975       guint64 rb_size = queue->ring_buffer_max_size;
976       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
977     }
978
979     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
980     buflevel = MAX (buflevel, buflevel2);
981
982     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
983     buflevel = MAX (buflevel, buflevel2);
984
985     /* also apply the rate estimate when we need to */
986     if (queue->use_rate_estimate) {
987       buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
988       buflevel = MAX (buflevel, buflevel2);
989     }
990
991     /* Don't get to 0% unless we're really empty */
992     if (queue->cur_level.bytes > 0)
993       buflevel = MAX (1, buflevel);
994   }
995 #undef GET_BUFFER_LEVEL_FOR_QUANTITY
996
997   if (is_buffering)
998     *is_buffering = queue->is_buffering;
999
1000   if (buffering_level)
1001     *buffering_level = buflevel;
1002
1003   GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
1004       buflevel);
1005
1006   return TRUE;
1007 }
1008
1009 static gint
1010 convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
1011 {
1012   int percent;
1013
1014   /* scale so that if buffering_level equals the high watermark,
1015    * the percentage is 100% */
1016   percent = buffering_level * 100 / queue->high_watermark;
1017   /* clip */
1018   if (percent > 100)
1019     percent = 100;
1020
1021   return percent;
1022 }
1023
1024 static void
1025 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
1026     gint * avg_in, gint * avg_out, gint64 * buffering_left)
1027 {
1028   if (mode) {
1029     if (!QUEUE_IS_USING_QUEUE (queue)) {
1030       if (QUEUE_IS_USING_RING_BUFFER (queue))
1031         *mode = GST_BUFFERING_TIMESHIFT;
1032       else
1033         *mode = GST_BUFFERING_DOWNLOAD;
1034     } else {
1035       *mode = GST_BUFFERING_STREAM;
1036     }
1037   }
1038
1039   if (avg_in)
1040     *avg_in = queue->byte_in_rate;
1041   if (avg_out)
1042     *avg_out = queue->byte_out_rate;
1043
1044   if (buffering_left) {
1045     *buffering_left = (percent == 100 ? 0 : -1);
1046
1047     if (queue->use_rate_estimate) {
1048       guint64 max, cur;
1049
1050       max = queue->max_level.rate_time;
1051       cur = queue->cur_level.rate_time;
1052
1053       if (percent != 100 && max > cur)
1054         *buffering_left = (max - cur) / 1000000;
1055     }
1056   }
1057 }
1058
1059 /* Called with the lock taken */
1060 static GstMessage *
1061 gst_queue2_get_buffering_message (GstQueue2 * queue)
1062 {
1063   GstMessage *msg = NULL;
1064
1065   if (queue->percent_changed) {
1066     gint percent = queue->buffering_percent;
1067
1068     queue->percent_changed = FALSE;
1069
1070     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1071     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1072
1073     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1074         queue->avg_out, queue->buffering_left);
1075   }
1076
1077   return msg;
1078 }
1079
1080 static void
1081 gst_queue2_post_buffering (GstQueue2 * queue)
1082 {
1083   GstMessage *msg = NULL;
1084
1085   g_mutex_lock (&queue->buffering_post_lock);
1086   GST_QUEUE2_MUTEX_LOCK (queue);
1087   msg = gst_queue2_get_buffering_message (queue);
1088   GST_QUEUE2_MUTEX_UNLOCK (queue);
1089
1090   if (msg != NULL)
1091     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1092
1093   g_mutex_unlock (&queue->buffering_post_lock);
1094 }
1095
1096 static void
1097 update_buffering (GstQueue2 * queue)
1098 {
1099   gint buffering_level, percent;
1100
1101   /* Ensure the variables used to calculate buffering state are up-to-date. */
1102   if (queue->current)
1103     update_cur_level (queue, queue->current);
1104   update_in_rates (queue, FALSE);
1105
1106   if (!get_buffering_level (queue, NULL, &buffering_level))
1107     return;
1108
1109   percent = convert_to_buffering_percent (queue, buffering_level);
1110
1111   if (queue->is_buffering) {
1112     /* if we were buffering see if we reached the high watermark */
1113     if (percent >= 100)
1114       queue->is_buffering = FALSE;
1115
1116     SET_PERCENT (queue, percent);
1117   } else {
1118     /* we were not buffering, check if we need to start buffering if we drop
1119      * below the low threshold */
1120     if (buffering_level < queue->low_watermark) {
1121       queue->is_buffering = TRUE;
1122       SET_PERCENT (queue, percent);
1123     }
1124   }
1125 }
1126
1127 static void
1128 reset_rate_timer (GstQueue2 * queue)
1129 {
1130   queue->bytes_in = 0;
1131   queue->bytes_out = 0;
1132   queue->byte_in_rate = 0.0;
1133   queue->byte_in_period = 0;
1134   queue->byte_out_rate = 0.0;
1135   queue->last_update_in_rates_elapsed = 0.0;
1136   queue->last_in_elapsed = 0.0;
1137   queue->last_out_elapsed = 0.0;
1138   queue->in_timer_started = FALSE;
1139   queue->out_timer_started = FALSE;
1140 }
1141
1142 /* the interval in seconds to recalculate the rate */
1143 #define RATE_INTERVAL    0.2
1144 /* Tuning for rate estimation. We use a large window for the input rate because
1145  * it should be stable when connected to a network. The output rate is less
1146  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1147  * therefore adapt more quickly.
1148  * However, initial input rate may be subject to a burst, and should therefore
1149  * initially also adapt more quickly to changes, and only later on give higher
1150  * weight to previous values. */
1151 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1152 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1153
1154 static void
1155 update_in_rates (GstQueue2 * queue, gboolean force)
1156 {
1157   gdouble elapsed, period;
1158   gdouble byte_in_rate;
1159
1160   if (!queue->in_timer_started) {
1161     queue->in_timer_started = TRUE;
1162     g_timer_start (queue->in_timer);
1163     return;
1164   }
1165
1166   queue->last_update_in_rates_elapsed = elapsed =
1167       g_timer_elapsed (queue->in_timer, NULL);
1168
1169   /* recalc after each interval. */
1170   if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1171     period = elapsed - queue->last_in_elapsed;
1172
1173     GST_DEBUG_OBJECT (queue,
1174         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1175         period, queue->bytes_in, queue->byte_in_period);
1176
1177     byte_in_rate = queue->bytes_in / period;
1178
1179     if (queue->byte_in_rate == 0.0)
1180       queue->byte_in_rate = byte_in_rate;
1181     else
1182       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1183           (double) queue->byte_in_period, period);
1184
1185     /* another data point, cap at 16 for long time running average */
1186     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1187       queue->byte_in_period += period;
1188
1189     /* reset the values to calculate rate over the next interval */
1190     queue->last_in_elapsed = elapsed;
1191     queue->bytes_in = 0;
1192   }
1193
1194   if (queue->byte_in_rate > 0.0) {
1195     queue->cur_level.rate_time =
1196         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1197   }
1198   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1199       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1200 }
1201
1202 static void
1203 update_out_rates (GstQueue2 * queue)
1204 {
1205   gdouble elapsed, period;
1206   gdouble byte_out_rate;
1207
1208   if (!queue->out_timer_started) {
1209     queue->out_timer_started = TRUE;
1210     g_timer_start (queue->out_timer);
1211     return;
1212   }
1213
1214   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1215
1216   /* recalc after each interval. */
1217   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1218     period = elapsed - queue->last_out_elapsed;
1219
1220     GST_DEBUG_OBJECT (queue,
1221         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1222
1223     byte_out_rate = queue->bytes_out / period;
1224
1225     if (queue->byte_out_rate == 0.0)
1226       queue->byte_out_rate = byte_out_rate;
1227     else
1228       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1229
1230     /* reset the values to calculate rate over the next interval */
1231     queue->last_out_elapsed = elapsed;
1232     queue->bytes_out = 0;
1233   }
1234   if (queue->byte_in_rate > 0.0) {
1235     queue->cur_level.rate_time =
1236         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1237   }
1238   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1239       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1240 }
1241
1242 static void
1243 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1244 {
1245   guint64 reading_pos, max_reading_pos;
1246
1247   reading_pos = pos;
1248   max_reading_pos = range->max_reading_pos;
1249
1250   max_reading_pos = MAX (max_reading_pos, reading_pos);
1251
1252   GST_DEBUG_OBJECT (queue,
1253       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1254       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1255   range->max_reading_pos = max_reading_pos;
1256
1257   update_cur_level (queue, range);
1258 }
1259
1260 static gboolean
1261 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1262 {
1263   GstEvent *event;
1264   gboolean res;
1265
1266   /* until we receive the FLUSH_STOP from this seek, we skip data */
1267   queue->seeking = TRUE;
1268   GST_QUEUE2_MUTEX_UNLOCK (queue);
1269
1270   debug_ranges (queue);
1271
1272   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1273
1274   event =
1275       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1276       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1277       GST_SEEK_TYPE_NONE, -1);
1278
1279   res = gst_pad_push_event (queue->sinkpad, event);
1280   GST_QUEUE2_MUTEX_LOCK (queue);
1281
1282   if (res) {
1283     /* Between us sending the seek event and re-acquiring the lock, the source
1284      * thread might already have pushed data and moved along the range's
1285      * writing_pos beyond the seek offset. In that case we don't want to set
1286      * the writing position back to the requested seek position, as it would
1287      * cause data to be written to the wrong offset in the file or ring buffer.
1288      * We still do the add_range call to switch the current range to the
1289      * requested range, or create one if one doesn't exist yet. */
1290     queue->current = add_range (queue, offset, FALSE);
1291   }
1292
1293   return res;
1294 }
1295
1296 /* get the threshold for when we decide to seek rather than wait */
1297 static guint64
1298 get_seek_threshold (GstQueue2 * queue)
1299 {
1300   guint64 threshold;
1301
1302   /* FIXME, find a good threshold based on the incoming rate. */
1303   threshold = 1024 * 512;
1304
1305   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1306     threshold = MIN (threshold,
1307         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1308   }
1309   return threshold;
1310 }
1311
1312 /* see if there is enough data in the file to read a full buffer */
1313 static gboolean
1314 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1315 {
1316   GstQueue2Range *range;
1317
1318   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1319       offset, length);
1320
1321   if ((range = find_range (queue, offset))) {
1322     if (queue->current != range) {
1323       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1324       perform_seek_to_offset (queue, range->writing_pos);
1325     }
1326
1327     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1328         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1329
1330     /* we have a range for offset */
1331     GST_DEBUG_OBJECT (queue,
1332         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1333         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1334
1335     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1336       return TRUE;
1337
1338     if (offset + length <= range->writing_pos)
1339       return TRUE;
1340     else
1341       GST_DEBUG_OBJECT (queue,
1342           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1343           (offset + length) - range->writing_pos);
1344
1345   } else {
1346     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1347         " len %u", offset, length);
1348     /* we don't have the range, see how far away we are */
1349     if (!queue->is_eos && queue->current) {
1350       guint64 threshold = get_seek_threshold (queue);
1351
1352       if (offset >= queue->current->offset && offset <=
1353           queue->current->writing_pos + threshold) {
1354         GST_INFO_OBJECT (queue,
1355             "requested data is within range, wait for data");
1356         return FALSE;
1357       }
1358     }
1359
1360     /* too far away, do a seek */
1361     perform_seek_to_offset (queue, offset);
1362   }
1363
1364   return FALSE;
1365 }
1366
1367 #ifdef HAVE_FSEEKO
1368 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1369 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1370 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1371 #else
1372 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1373 #endif
1374
1375 static GstFlowReturn
1376 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1377     guint8 * dst, gint64 * read_return)
1378 {
1379   guint8 *ring_buffer;
1380   size_t res;
1381
1382   ring_buffer = queue->ring_buffer;
1383
1384   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1385     goto seek_failed;
1386
1387   /* this should not block */
1388   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1389       length, offset);
1390   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1391     res = fread (dst, 1, length, queue->temp_file);
1392   } else {
1393     memcpy (dst, ring_buffer + offset, length);
1394     res = length;
1395   }
1396
1397   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1398
1399   if (G_UNLIKELY (res < length)) {
1400     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1401       goto could_not_read;
1402     /* check for errors or EOF */
1403     if (ferror (queue->temp_file))
1404       goto could_not_read;
1405     if (feof (queue->temp_file) && length > 0)
1406       goto eos;
1407   }
1408
1409   *read_return = res;
1410
1411   return GST_FLOW_OK;
1412
1413 seek_failed:
1414   {
1415     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1416     return GST_FLOW_ERROR;
1417   }
1418 could_not_read:
1419   {
1420     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1421     return GST_FLOW_ERROR;
1422   }
1423 eos:
1424   {
1425     GST_DEBUG ("non-regular file hits EOS");
1426     return GST_FLOW_EOS;
1427   }
1428 }
1429
1430 static GstFlowReturn
1431 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1432     GstBuffer ** buffer)
1433 {
1434   GstBuffer *buf;
1435   GstMapInfo info;
1436   guint8 *data;
1437   guint64 file_offset;
1438   guint block_length, remaining, read_length;
1439   guint64 rb_size;
1440   guint64 max_size;
1441   guint64 rpos;
1442   GstFlowReturn ret = GST_FLOW_OK;
1443
1444   /* allocate the output buffer of the requested size */
1445   if (*buffer == NULL)
1446     buf = gst_buffer_new_allocate (NULL, length, NULL);
1447   else
1448     buf = *buffer;
1449
1450   if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1451     goto buffer_write_fail;
1452   data = info.data;
1453
1454   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1455       offset);
1456
1457   rpos = offset;
1458   rb_size = queue->ring_buffer_max_size;
1459   max_size = QUEUE_MAX_BYTES (queue);
1460
1461   remaining = length;
1462   while (remaining > 0) {
1463     /* configure how much/whether to read */
1464     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1465       read_length = 0;
1466
1467       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1468         guint64 level;
1469
1470         /* calculate how far away the offset is */
1471         if (queue->current->writing_pos > rpos)
1472           level = queue->current->writing_pos - rpos;
1473         else
1474           level = 0;
1475
1476         GST_DEBUG_OBJECT (queue,
1477             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1478             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1479             rpos, queue->current->writing_pos, level, max_size);
1480
1481         if (level >= max_size) {
1482           /* we don't have the data but if we have a ring buffer that is full, we
1483            * need to read */
1484           GST_DEBUG_OBJECT (queue,
1485               "ring buffer full, reading QUEUE_MAX_BYTES %"
1486               G_GUINT64_FORMAT " bytes", max_size);
1487           read_length = max_size;
1488         } else if (queue->is_eos) {
1489           /* won't get any more data so read any data we have */
1490           if (level) {
1491             GST_DEBUG_OBJECT (queue,
1492                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1493                 level);
1494             read_length = level;
1495             remaining = level;
1496             length = level;
1497           } else
1498             goto hit_eos;
1499         }
1500       }
1501
1502       if (read_length == 0) {
1503         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1504           GST_DEBUG_OBJECT (queue,
1505               "update current position [%" G_GUINT64_FORMAT "-%"
1506               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1507           update_cur_pos (queue, queue->current, rpos);
1508           GST_QUEUE2_SIGNAL_DEL (queue);
1509         }
1510
1511         if (queue->use_buffering)
1512           update_buffering (queue);
1513
1514         GST_DEBUG_OBJECT (queue, "waiting for add");
1515         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1516         continue;
1517       }
1518     } else {
1519       /* we have the requested data so read it */
1520       read_length = remaining;
1521     }
1522
1523     /* set range reading_pos to actual reading position for this read */
1524     queue->current->reading_pos = rpos;
1525
1526     /* configure how much and from where to read */
1527     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1528       file_offset =
1529           (queue->current->rb_offset + (rpos -
1530               queue->current->offset)) % rb_size;
1531       if (file_offset + read_length > rb_size) {
1532         block_length = rb_size - file_offset;
1533       } else {
1534         block_length = read_length;
1535       }
1536     } else {
1537       file_offset = rpos;
1538       block_length = read_length;
1539     }
1540
1541     /* while we still have data to read, we loop */
1542     while (read_length > 0) {
1543       gint64 read_return;
1544
1545       ret =
1546           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1547           data, &read_return);
1548       if (ret != GST_FLOW_OK)
1549         goto read_error;
1550
1551       file_offset += read_return;
1552       if (QUEUE_IS_USING_RING_BUFFER (queue))
1553         file_offset %= rb_size;
1554
1555       data += read_return;
1556       read_length -= read_return;
1557       block_length = read_length;
1558       remaining -= read_return;
1559
1560       rpos = (queue->current->reading_pos += read_return);
1561       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1562     }
1563     GST_QUEUE2_SIGNAL_DEL (queue);
1564     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1565   }
1566
1567   gst_buffer_unmap (buf, &info);
1568   gst_buffer_resize (buf, 0, length);
1569
1570   GST_BUFFER_OFFSET (buf) = offset;
1571   GST_BUFFER_OFFSET_END (buf) = offset + length;
1572
1573   *buffer = buf;
1574
1575   return ret;
1576
1577   /* ERRORS */
1578 hit_eos:
1579   {
1580     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1581     gst_buffer_unmap (buf, &info);
1582     if (*buffer == NULL)
1583       gst_buffer_unref (buf);
1584     return GST_FLOW_EOS;
1585   }
1586 out_flushing:
1587   {
1588     GST_DEBUG_OBJECT (queue, "we are flushing");
1589     gst_buffer_unmap (buf, &info);
1590     if (*buffer == NULL)
1591       gst_buffer_unref (buf);
1592     return GST_FLOW_FLUSHING;
1593   }
1594 read_error:
1595   {
1596     GST_DEBUG_OBJECT (queue, "we have a read error");
1597     gst_buffer_unmap (buf, &info);
1598     if (*buffer == NULL)
1599       gst_buffer_unref (buf);
1600     return ret;
1601   }
1602 buffer_write_fail:
1603   {
1604     GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1605         ("Can't write to buffer"));
1606     if (*buffer == NULL)
1607       gst_buffer_unref (buf);
1608     return GST_FLOW_ERROR;
1609   }
1610 }
1611
1612 /* should be called with QUEUE_LOCK */
1613 static GstMiniObject *
1614 gst_queue2_read_item_from_file (GstQueue2 * queue)
1615 {
1616   GstMiniObject *item;
1617
1618   if (queue->stream_start_event != NULL) {
1619     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1620     queue->stream_start_event = NULL;
1621   } else if (queue->starting_segment != NULL) {
1622     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1623     queue->starting_segment = NULL;
1624   } else {
1625     GstFlowReturn ret;
1626     GstBuffer *buffer = NULL;
1627     guint64 reading_pos;
1628
1629     reading_pos = queue->current->reading_pos;
1630
1631     ret =
1632         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1633         &buffer);
1634
1635     switch (ret) {
1636       case GST_FLOW_OK:
1637         item = GST_MINI_OBJECT_CAST (buffer);
1638         break;
1639       case GST_FLOW_EOS:
1640         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1641         break;
1642       default:
1643         item = NULL;
1644         break;
1645     }
1646   }
1647   return item;
1648 }
1649
1650 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1651  * the temp filename. */
1652 static gboolean
1653 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1654 {
1655   gint fd = -1;
1656   gchar *name = NULL;
1657
1658   if (queue->temp_file)
1659     goto already_opened;
1660
1661   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1662
1663   /* If temp_template was set, allocate a filename and open that file */
1664
1665   /* nothing to do */
1666   if (queue->temp_template == NULL)
1667     goto no_directory;
1668
1669   /* make copy of the template, we don't want to change this */
1670   name = g_strdup (queue->temp_template);
1671
1672 #ifdef __BIONIC__
1673   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1674 #else
1675   fd = g_mkstemp (name);
1676 #endif
1677
1678   if (fd == -1)
1679     goto mkstemp_failed;
1680
1681   /* open the file for update/writing */
1682   queue->temp_file = fdopen (fd, "wb+");
1683   /* error creating file */
1684   if (queue->temp_file == NULL)
1685     goto open_failed;
1686
1687   g_free (queue->temp_location);
1688   queue->temp_location = name;
1689
1690   GST_QUEUE2_MUTEX_UNLOCK (queue);
1691
1692   /* we can't emit the notify with the lock */
1693   g_object_notify (G_OBJECT (queue), "temp-location");
1694
1695   GST_QUEUE2_MUTEX_LOCK (queue);
1696
1697   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1698
1699   return TRUE;
1700
1701   /* ERRORS */
1702 already_opened:
1703   {
1704     GST_DEBUG_OBJECT (queue, "temp file was already open");
1705     return TRUE;
1706   }
1707 no_directory:
1708   {
1709     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1710         (_("No Temp directory specified.")), (NULL));
1711     return FALSE;
1712   }
1713 mkstemp_failed:
1714   {
1715     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1716         (_("Could not create temp file \"%s\"."), queue->temp_template),
1717         GST_ERROR_SYSTEM);
1718     g_free (name);
1719     return FALSE;
1720   }
1721 open_failed:
1722   {
1723     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1724         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1725     g_free (name);
1726     if (fd != -1)
1727       close (fd);
1728     return FALSE;
1729   }
1730 }
1731
1732 static void
1733 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1734 {
1735   /* nothing to do */
1736   if (queue->temp_file == NULL)
1737     return;
1738
1739   GST_DEBUG_OBJECT (queue, "closing temp file");
1740
1741   fflush (queue->temp_file);
1742   fclose (queue->temp_file);
1743
1744   if (queue->temp_remove) {
1745     if (remove (queue->temp_location) < 0) {
1746       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1747           queue->temp_location, g_strerror (errno));
1748     }
1749   }
1750
1751   queue->temp_file = NULL;
1752   clean_ranges (queue);
1753 }
1754
1755 static void
1756 gst_queue2_flush_temp_file (GstQueue2 * queue)
1757 {
1758   if (queue->temp_file == NULL)
1759     return;
1760
1761   GST_DEBUG_OBJECT (queue, "flushing temp file");
1762
1763   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1764 }
1765
1766 static void
1767 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1768 {
1769   if (!QUEUE_IS_USING_QUEUE (queue)) {
1770     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1771       gst_queue2_flush_temp_file (queue);
1772     init_ranges (queue);
1773   } else {
1774     while (!g_queue_is_empty (&queue->queue)) {
1775       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1776
1777       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1778           && GST_EVENT_IS_STICKY (qitem->item)
1779           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1780           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1781         gst_pad_store_sticky_event (queue->srcpad,
1782             GST_EVENT_CAST (qitem->item));
1783       }
1784
1785       /* Then lose another reference because we are supposed to destroy that
1786          data when flushing */
1787       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1788         gst_mini_object_unref (qitem->item);
1789       g_slice_free (GstQueue2Item, qitem);
1790     }
1791   }
1792   queue->last_query = FALSE;
1793   g_cond_signal (&queue->query_handled);
1794   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1795   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1796   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1797   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1798   queue->sink_tainted = queue->src_tainted = TRUE;
1799   if (queue->starting_segment != NULL)
1800     gst_event_unref (queue->starting_segment);
1801   queue->starting_segment = NULL;
1802   queue->segment_event_received = FALSE;
1803   gst_event_replace (&queue->stream_start_event, NULL);
1804
1805   /* we deleted a lot of something */
1806   GST_QUEUE2_SIGNAL_DEL (queue);
1807 }
1808
1809 static gboolean
1810 gst_queue2_wait_free_space (GstQueue2 * queue)
1811 {
1812   /* We make space available if we're "full" according to whatever
1813    * the user defined as "full". */
1814   if (gst_queue2_is_filled (queue)) {
1815     gboolean started;
1816
1817     /* pause the timer while we wait. The fact that we are waiting does not mean
1818      * the byterate on the input pad is lower */
1819     if ((started = queue->in_timer_started))
1820       g_timer_stop (queue->in_timer);
1821
1822     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1823         "queue is full, waiting for free space");
1824     do {
1825       /* Wait for space to be available, we could be unlocked because of a flush. */
1826       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1827     }
1828     while (gst_queue2_is_filled (queue));
1829
1830     /* and continue if we were running before */
1831     if (started)
1832       g_timer_continue (queue->in_timer);
1833   }
1834   return TRUE;
1835
1836   /* ERRORS */
1837 out_flushing:
1838   {
1839     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1840     return FALSE;
1841   }
1842 }
1843
1844 static gboolean
1845 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1846 {
1847   GstMapInfo info;
1848   guint8 *data, *ring_buffer;
1849   guint size, rb_size;
1850   guint64 writing_pos, new_writing_pos;
1851   GstQueue2Range *range, *prev, *next;
1852   gboolean do_seek = FALSE;
1853
1854   if (QUEUE_IS_USING_RING_BUFFER (queue))
1855     writing_pos = queue->current->rb_writing_pos;
1856   else
1857     writing_pos = queue->current->writing_pos;
1858   ring_buffer = queue->ring_buffer;
1859   rb_size = queue->ring_buffer_max_size;
1860
1861   if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
1862     goto buffer_read_error;
1863
1864   size = info.size;
1865   data = info.data;
1866
1867   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1868       writing_pos);
1869
1870   /* sanity check */
1871   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1872       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1873     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1874         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1875         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1876   }
1877
1878   while (size > 0) {
1879     guint to_write;
1880
1881     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1882       gint64 space;
1883
1884       /* calculate the space in the ring buffer not used by data from
1885        * the current range */
1886       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1887         /* wait until there is some free space */
1888         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1889       }
1890       /* get the amount of space we have */
1891       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1892
1893       /* calculate if we need to split or if we can write the entire
1894        * buffer now */
1895       to_write = MIN (size, space);
1896
1897       /* the writing position in the ring buffer after writing (part
1898        * or all of) the buffer */
1899       new_writing_pos = (writing_pos + to_write) % rb_size;
1900
1901       prev = NULL;
1902       range = queue->ranges;
1903
1904       /* if we need to overwrite data in the ring buffer, we need to
1905        * update the ranges
1906        *
1907        * warning: this code is complicated and includes some
1908        * simplifications - pen, paper and diagrams for the cases
1909        * recommended! */
1910       while (range) {
1911         guint64 range_data_start, range_data_end;
1912         GstQueue2Range *range_to_destroy = NULL;
1913
1914         if (range == queue->current)
1915           goto next_range;
1916
1917         range_data_start = range->rb_offset;
1918         range_data_end = range->rb_writing_pos;
1919
1920         /* handle the special case where the range has no data in it */
1921         if (range->writing_pos == range->offset) {
1922           if (range != queue->current) {
1923             GST_DEBUG_OBJECT (queue,
1924                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1925                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1926             /* remove range */
1927             range_to_destroy = range;
1928             if (prev)
1929               prev->next = range->next;
1930           }
1931           goto next_range;
1932         }
1933
1934         if (range_data_end > range_data_start) {
1935           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1936             goto next_range;
1937
1938           if (new_writing_pos > range_data_start) {
1939             if (new_writing_pos >= range_data_end) {
1940               GST_DEBUG_OBJECT (queue,
1941                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1942                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1943               /* remove range */
1944               range_to_destroy = range;
1945               if (prev)
1946                 prev->next = range->next;
1947             } else {
1948               GST_DEBUG_OBJECT (queue,
1949                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1950                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1951                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1952                   range->offset + new_writing_pos - range_data_start,
1953                   new_writing_pos);
1954               range->offset += (new_writing_pos - range_data_start);
1955               range->rb_offset = new_writing_pos;
1956             }
1957           }
1958         } else {
1959           guint64 new_wpos_virt = writing_pos + to_write;
1960
1961           if (new_wpos_virt <= range_data_start)
1962             goto next_range;
1963
1964           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1965             GST_DEBUG_OBJECT (queue,
1966                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1967                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1968             /* remove range */
1969             range_to_destroy = range;
1970             if (prev)
1971               prev->next = range->next;
1972           } else {
1973             GST_DEBUG_OBJECT (queue,
1974                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1975                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1976                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1977                 range->offset + new_writing_pos - range_data_start,
1978                 new_writing_pos);
1979             range->offset += (new_wpos_virt - range_data_start);
1980             range->rb_offset = new_writing_pos;
1981           }
1982         }
1983
1984       next_range:
1985         if (!range_to_destroy)
1986           prev = range;
1987
1988         range = range->next;
1989         if (range_to_destroy) {
1990           if (range_to_destroy == queue->ranges)
1991             queue->ranges = range;
1992           g_slice_free (GstQueue2Range, range_to_destroy);
1993           range_to_destroy = NULL;
1994         }
1995       }
1996     } else {
1997       to_write = size;
1998       new_writing_pos = writing_pos + to_write;
1999     }
2000
2001     if (QUEUE_IS_USING_TEMP_FILE (queue)
2002         && FSEEK_FILE (queue->temp_file, writing_pos))
2003       goto seek_failed;
2004
2005     if (new_writing_pos > writing_pos) {
2006       GST_INFO_OBJECT (queue,
2007           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
2008           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
2009           queue->current->writing_pos, queue->current->rb_writing_pos);
2010       /* either not using ring buffer or no wrapping, just write */
2011       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2012         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
2013           goto handle_error;
2014       } else {
2015         memcpy (ring_buffer + writing_pos, data, to_write);
2016       }
2017
2018       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
2019         /* try to merge with next range */
2020         while ((next = queue->current->next)) {
2021           GST_INFO_OBJECT (queue,
2022               "checking merge with next range %" G_GUINT64_FORMAT " < %"
2023               G_GUINT64_FORMAT, new_writing_pos, next->offset);
2024           if (new_writing_pos < next->offset)
2025             break;
2026
2027           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
2028               next->writing_pos);
2029
2030           /* remove the group */
2031           queue->current->next = next->next;
2032
2033           /* We use the threshold to decide if we want to do a seek or simply
2034            * read the data again. If there is not so much data in the range we
2035            * prefer to avoid to seek and read it again. */
2036           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
2037             /* the new range had more data than the threshold, it's worth keeping
2038              * it and doing a seek. */
2039             new_writing_pos = next->writing_pos;
2040             do_seek = TRUE;
2041           }
2042           g_slice_free (GstQueue2Range, next);
2043         }
2044         goto update_and_signal;
2045       }
2046     } else {
2047       /* wrapping */
2048       guint block_one, block_two;
2049
2050       block_one = rb_size - writing_pos;
2051       block_two = to_write - block_one;
2052
2053       if (block_one > 0) {
2054         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2055         /* write data to end of ring buffer */
2056         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2057           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2058             goto handle_error;
2059         } else {
2060           memcpy (ring_buffer + writing_pos, data, block_one);
2061         }
2062       }
2063
2064       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2065         goto seek_failed;
2066
2067       if (block_two > 0) {
2068         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2069         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2070           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2071             goto handle_error;
2072         } else {
2073           memcpy (ring_buffer, data + block_one, block_two);
2074         }
2075       }
2076     }
2077
2078   update_and_signal:
2079     /* update the writing positions */
2080     size -= to_write;
2081     GST_INFO_OBJECT (queue,
2082         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2083         to_write, writing_pos, size);
2084
2085     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2086       data += to_write;
2087       queue->current->writing_pos += to_write;
2088       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2089     } else {
2090       queue->current->writing_pos = writing_pos = new_writing_pos;
2091     }
2092     if (do_seek)
2093       perform_seek_to_offset (queue, new_writing_pos);
2094
2095     update_cur_level (queue, queue->current);
2096
2097     /* update the buffering status */
2098     if (queue->use_buffering) {
2099       GstMessage *msg;
2100       update_buffering (queue);
2101       msg = gst_queue2_get_buffering_message (queue);
2102       if (msg) {
2103         GST_QUEUE2_MUTEX_UNLOCK (queue);
2104         g_mutex_lock (&queue->buffering_post_lock);
2105         gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2106         g_mutex_unlock (&queue->buffering_post_lock);
2107         GST_QUEUE2_MUTEX_LOCK (queue);
2108       }
2109     }
2110
2111     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2112         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2113
2114     GST_QUEUE2_SIGNAL_ADD (queue);
2115   }
2116
2117   gst_buffer_unmap (buffer, &info);
2118
2119   return TRUE;
2120
2121   /* ERRORS */
2122 out_flushing:
2123   {
2124     GST_DEBUG_OBJECT (queue, "we are flushing");
2125     gst_buffer_unmap (buffer, &info);
2126     /* FIXME - GST_FLOW_EOS ? */
2127     return FALSE;
2128   }
2129 seek_failed:
2130   {
2131     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2132     gst_buffer_unmap (buffer, &info);
2133     return FALSE;
2134   }
2135 handle_error:
2136   {
2137     switch (errno) {
2138       case ENOSPC:{
2139         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2140         break;
2141       }
2142       default:{
2143         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2144             (_("Error while writing to download file.")),
2145             ("%s", g_strerror (errno)));
2146       }
2147     }
2148     gst_buffer_unmap (buffer, &info);
2149     return FALSE;
2150   }
2151 buffer_read_error:
2152   {
2153     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2154         ("Can't read from buffer"));
2155     return FALSE;
2156   }
2157 }
2158
2159 static gboolean
2160 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2161 {
2162   GstQueue2 *queue = q;
2163
2164   GST_TRACE_OBJECT (queue,
2165       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2166       gst_buffer_get_size (*buf));
2167
2168   if (!gst_queue2_create_write (queue, *buf)) {
2169     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2170     return FALSE;
2171   }
2172   return TRUE;
2173 }
2174
2175 static gboolean
2176 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2177 {
2178   guint *p_size = data;
2179   gsize buf_size;
2180
2181   buf_size = gst_buffer_get_size (*buf);
2182   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2183   *p_size += buf_size;
2184   return TRUE;
2185 }
2186
2187 /* enqueue an item an update the level stats */
2188 static void
2189 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2190     GstQueue2ItemType item_type)
2191 {
2192   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2193     GstBuffer *buffer;
2194     guint size;
2195
2196     buffer = GST_BUFFER_CAST (item);
2197     size = gst_buffer_get_size (buffer);
2198
2199     /* add buffer to the statistics */
2200     if (QUEUE_IS_USING_QUEUE (queue)) {
2201       queue->cur_level.buffers++;
2202       queue->cur_level.bytes += size;
2203     }
2204     queue->bytes_in += size;
2205
2206     /* apply new buffer to segment stats */
2207     apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2208     /* update the byterate stats */
2209     update_in_rates (queue, FALSE);
2210
2211     if (!QUEUE_IS_USING_QUEUE (queue)) {
2212       /* FIXME - check return value? */
2213       gst_queue2_create_write (queue, buffer);
2214     }
2215   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2216     GstBufferList *buffer_list;
2217     guint size = 0;
2218
2219     buffer_list = GST_BUFFER_LIST_CAST (item);
2220
2221     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2222     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2223
2224     /* add buffer to the statistics */
2225     if (QUEUE_IS_USING_QUEUE (queue)) {
2226       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2227       queue->cur_level.bytes += size;
2228     }
2229     queue->bytes_in += size;
2230
2231     /* apply new buffer to segment stats */
2232     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2233
2234     /* update the byterate stats */
2235     update_in_rates (queue, FALSE);
2236
2237     if (!QUEUE_IS_USING_QUEUE (queue)) {
2238       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2239     }
2240   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2241     GstEvent *event;
2242
2243     event = GST_EVENT_CAST (item);
2244
2245     switch (GST_EVENT_TYPE (event)) {
2246       case GST_EVENT_EOS:
2247         /* Zero the thresholds, this makes sure the queue is completely
2248          * filled and we can read all data from the queue. */
2249         GST_DEBUG_OBJECT (queue, "we have EOS");
2250         queue->is_eos = TRUE;
2251         /* Force updating the input bitrate */
2252         update_in_rates (queue, TRUE);
2253         break;
2254       case GST_EVENT_SEGMENT:
2255         apply_segment (queue, event, &queue->sink_segment, TRUE);
2256         /* This is our first new segment, we hold it
2257          * as we can't save it on the temp file */
2258         if (!QUEUE_IS_USING_QUEUE (queue)) {
2259           if (queue->segment_event_received)
2260             goto unexpected_event;
2261
2262           queue->segment_event_received = TRUE;
2263           if (queue->starting_segment != NULL)
2264             gst_event_unref (queue->starting_segment);
2265           queue->starting_segment = event;
2266           item = NULL;
2267         }
2268         /* a new segment allows us to accept more buffers if we got EOS
2269          * from downstream */
2270         queue->unexpected = FALSE;
2271         break;
2272       case GST_EVENT_GAP:
2273         apply_gap (queue, event, &queue->sink_segment, TRUE);
2274         break;
2275       case GST_EVENT_STREAM_START:
2276         if (!QUEUE_IS_USING_QUEUE (queue)) {
2277           gst_event_replace (&queue->stream_start_event, event);
2278           gst_event_unref (event);
2279           item = NULL;
2280         }
2281         break;
2282       case GST_EVENT_CAPS:{
2283         GstCaps *caps;
2284
2285         gst_event_parse_caps (event, &caps);
2286         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2287
2288         if (!QUEUE_IS_USING_QUEUE (queue)) {
2289           GST_LOG ("Dropping caps event, not using queue");
2290           gst_event_unref (event);
2291           item = NULL;
2292         }
2293         break;
2294       }
2295       default:
2296         if (!QUEUE_IS_USING_QUEUE (queue))
2297           goto unexpected_event;
2298         break;
2299     }
2300   } else if (GST_IS_QUERY (item)) {
2301     /* Can't happen as we check that in the caller */
2302     if (!QUEUE_IS_USING_QUEUE (queue))
2303       g_assert_not_reached ();
2304   } else {
2305     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2306         item, GST_OBJECT_NAME (queue));
2307     /* we can't really unref since we don't know what it is */
2308     item = NULL;
2309   }
2310
2311   if (item) {
2312     /* update the buffering status */
2313     if (queue->use_buffering)
2314       update_buffering (queue);
2315
2316     if (QUEUE_IS_USING_QUEUE (queue)) {
2317       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2318       qitem->type = item_type;
2319       qitem->item = item;
2320       g_queue_push_tail (&queue->queue, qitem);
2321     } else {
2322       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2323     }
2324
2325     GST_QUEUE2_SIGNAL_ADD (queue);
2326   }
2327
2328   return;
2329
2330   /* ERRORS */
2331 unexpected_event:
2332   {
2333     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2334
2335     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2336         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2337         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2338     gst_event_unref (GST_EVENT_CAST (item));
2339     return;
2340   }
2341 }
2342
2343 /* dequeue an item from the queue and update level stats */
2344 static GstMiniObject *
2345 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2346 {
2347   GstMiniObject *item;
2348
2349   if (!QUEUE_IS_USING_QUEUE (queue)) {
2350     item = gst_queue2_read_item_from_file (queue);
2351   } else {
2352     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2353
2354     if (qitem == NULL)
2355       goto no_item;
2356
2357     item = qitem->item;
2358     g_slice_free (GstQueue2Item, qitem);
2359   }
2360
2361   if (item == NULL)
2362     goto no_item;
2363
2364   if (GST_IS_BUFFER (item)) {
2365     GstBuffer *buffer;
2366     guint size;
2367
2368     buffer = GST_BUFFER_CAST (item);
2369     size = gst_buffer_get_size (buffer);
2370     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2371
2372     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2373         "retrieved buffer %p from queue", buffer);
2374
2375     if (QUEUE_IS_USING_QUEUE (queue)) {
2376       queue->cur_level.buffers--;
2377       queue->cur_level.bytes -= size;
2378     }
2379     queue->bytes_out += size;
2380
2381     apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2382     /* update the byterate stats */
2383     update_out_rates (queue);
2384     /* update the buffering */
2385     if (queue->use_buffering)
2386       update_buffering (queue);
2387
2388   } else if (GST_IS_EVENT (item)) {
2389     GstEvent *event = GST_EVENT_CAST (item);
2390
2391     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2392
2393     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2394         "retrieved event %p from queue", event);
2395
2396     switch (GST_EVENT_TYPE (event)) {
2397       case GST_EVENT_EOS:
2398         /* queue is empty now that we dequeued the EOS */
2399         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2400         break;
2401       case GST_EVENT_SEGMENT:
2402         apply_segment (queue, event, &queue->src_segment, FALSE);
2403         break;
2404       case GST_EVENT_GAP:
2405         apply_gap (queue, event, &queue->src_segment, FALSE);
2406         break;
2407       default:
2408         break;
2409     }
2410   } else if (GST_IS_BUFFER_LIST (item)) {
2411     GstBufferList *buffer_list;
2412     guint size = 0;
2413
2414     buffer_list = GST_BUFFER_LIST_CAST (item);
2415     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2416     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2417
2418     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2419         "retrieved buffer list %p from queue", buffer_list);
2420
2421     if (QUEUE_IS_USING_QUEUE (queue)) {
2422       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2423       queue->cur_level.bytes -= size;
2424     }
2425     queue->bytes_out += size;
2426
2427     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2428     /* update the byterate stats */
2429     update_out_rates (queue);
2430     /* update the buffering */
2431     if (queue->use_buffering)
2432       update_buffering (queue);
2433   } else if (GST_IS_QUERY (item)) {
2434     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2435         "retrieved query %p from queue", item);
2436     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2437   } else {
2438     g_warning
2439         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2440         item, GST_OBJECT_NAME (queue));
2441     item = NULL;
2442     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2443   }
2444   GST_QUEUE2_SIGNAL_DEL (queue);
2445
2446   return item;
2447
2448   /* ERRORS */
2449 no_item:
2450   {
2451     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2452     return NULL;
2453   }
2454 }
2455
2456 static gboolean
2457 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2458     GstEvent * event)
2459 {
2460   gboolean ret = TRUE;
2461   GstQueue2 *queue;
2462
2463   queue = GST_QUEUE2 (parent);
2464
2465   switch (GST_EVENT_TYPE (event)) {
2466     case GST_EVENT_FLUSH_START:
2467     {
2468       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2469       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2470         /* forward event */
2471         ret = gst_pad_push_event (queue->srcpad, event);
2472
2473         /* now unblock the chain function */
2474         GST_QUEUE2_MUTEX_LOCK (queue);
2475         queue->srcresult = GST_FLOW_FLUSHING;
2476         queue->sinkresult = GST_FLOW_FLUSHING;
2477         /* unblock the loop and chain functions */
2478         GST_QUEUE2_SIGNAL_ADD (queue);
2479         GST_QUEUE2_SIGNAL_DEL (queue);
2480         GST_QUEUE2_MUTEX_UNLOCK (queue);
2481
2482         /* make sure it pauses, this should happen since we sent
2483          * flush_start downstream. */
2484         gst_pad_pause_task (queue->srcpad);
2485         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2486
2487         GST_QUEUE2_MUTEX_LOCK (queue);
2488         queue->last_query = FALSE;
2489         g_cond_signal (&queue->query_handled);
2490         GST_QUEUE2_MUTEX_UNLOCK (queue);
2491       } else {
2492         GST_QUEUE2_MUTEX_LOCK (queue);
2493         /* flush the sink pad */
2494         queue->sinkresult = GST_FLOW_FLUSHING;
2495         GST_QUEUE2_SIGNAL_DEL (queue);
2496         queue->last_query = FALSE;
2497         g_cond_signal (&queue->query_handled);
2498         GST_QUEUE2_MUTEX_UNLOCK (queue);
2499
2500         gst_event_unref (event);
2501       }
2502       break;
2503     }
2504     case GST_EVENT_FLUSH_STOP:
2505     {
2506       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2507
2508       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2509         /* forward event */
2510         ret = gst_pad_push_event (queue->srcpad, event);
2511
2512         GST_QUEUE2_MUTEX_LOCK (queue);
2513         gst_queue2_locked_flush (queue, FALSE, TRUE);
2514         queue->srcresult = GST_FLOW_OK;
2515         queue->sinkresult = GST_FLOW_OK;
2516         queue->is_eos = FALSE;
2517         queue->unexpected = FALSE;
2518         queue->seeking = FALSE;
2519         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2520         /* reset rate counters */
2521         reset_rate_timer (queue);
2522         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2523             queue->srcpad, NULL);
2524         GST_QUEUE2_MUTEX_UNLOCK (queue);
2525       } else {
2526         GST_QUEUE2_MUTEX_LOCK (queue);
2527         queue->segment_event_received = FALSE;
2528         queue->is_eos = FALSE;
2529         queue->unexpected = FALSE;
2530         queue->sinkresult = GST_FLOW_OK;
2531         queue->seeking = FALSE;
2532         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2533         GST_QUEUE2_MUTEX_UNLOCK (queue);
2534
2535         gst_event_unref (event);
2536       }
2537       break;
2538     }
2539     case GST_EVENT_TAG:{
2540       if (queue->use_tags_bitrate) {
2541         GstTagList *tags;
2542         guint bitrate;
2543
2544         gst_event_parse_tag (event, &tags);
2545         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2546             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2547           GST_QUEUE2_MUTEX_LOCK (queue);
2548           queue->sink_tags_bitrate = bitrate;
2549           GST_QUEUE2_MUTEX_UNLOCK (queue);
2550           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2551         }
2552       }
2553       /* Fall-through */
2554     }
2555     default:
2556       if (GST_EVENT_IS_SERIALIZED (event)) {
2557         /* serialized events go in the queue */
2558         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2559         if (queue->srcresult != GST_FLOW_OK) {
2560           /* Errors in sticky event pushing are no problem and ignored here
2561            * as they will cause more meaningful errors during data flow.
2562            * For EOS events, that are not followed by data flow, we still
2563            * return FALSE here though and report an error.
2564            */
2565           if (!GST_EVENT_IS_STICKY (event)) {
2566             goto out_flow_error;
2567           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2568             if (queue->srcresult == GST_FLOW_NOT_LINKED
2569                 || queue->srcresult < GST_FLOW_EOS) {
2570               GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2571                   (_("Internal data flow error.")),
2572                   ("streaming task paused, reason %s (%d)",
2573                       gst_flow_get_name (queue->srcresult), queue->srcresult));
2574             }
2575             goto out_flow_error;
2576           }
2577         }
2578         /* refuse more events on EOS */
2579         if (queue->is_eos)
2580           goto out_eos;
2581         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2582         GST_QUEUE2_MUTEX_UNLOCK (queue);
2583         gst_queue2_post_buffering (queue);
2584       } else {
2585         /* non-serialized events are passed upstream. */
2586         ret = gst_pad_push_event (queue->srcpad, event);
2587       }
2588       break;
2589   }
2590   return ret;
2591
2592   /* ERRORS */
2593 out_flushing:
2594   {
2595     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2596     GST_QUEUE2_MUTEX_UNLOCK (queue);
2597     gst_event_unref (event);
2598     return FALSE;
2599   }
2600 out_eos:
2601   {
2602     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2603     GST_QUEUE2_MUTEX_UNLOCK (queue);
2604     gst_event_unref (event);
2605     return FALSE;
2606   }
2607 out_flow_error:
2608   {
2609     GST_LOG_OBJECT (queue,
2610         "refusing event, we have a downstream flow error: %s",
2611         gst_flow_get_name (queue->srcresult));
2612     GST_QUEUE2_MUTEX_UNLOCK (queue);
2613     gst_event_unref (event);
2614     return FALSE;
2615   }
2616 }
2617
2618 static gboolean
2619 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2620     GstQuery * query)
2621 {
2622   GstQueue2 *queue;
2623   gboolean res;
2624
2625   queue = GST_QUEUE2 (parent);
2626
2627   switch (GST_QUERY_TYPE (query)) {
2628     default:
2629       if (GST_QUERY_IS_SERIALIZED (query)) {
2630         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2631         /* serialized events go in the queue. We need to be certain that we
2632          * don't cause deadlocks waiting for the query return value. We check if
2633          * the queue is empty (nothing is blocking downstream and the query can
2634          * be pushed for sure) or we are not buffering. If we are buffering,
2635          * the pipeline waits to unblock downstream until our queue fills up
2636          * completely, which can not happen if we block on the query..
2637          * Therefore we only potentially block when we are not buffering. */
2638         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2639         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2640                 || !queue->use_buffering)) {
2641           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2642             gst_queue2_locked_enqueue (queue, query,
2643                 GST_QUEUE2_ITEM_TYPE_QUERY);
2644
2645             STATUS (queue, queue->sinkpad, "wait for QUERY");
2646             g_cond_wait (&queue->query_handled, &queue->qlock);
2647             if (queue->sinkresult != GST_FLOW_OK)
2648               goto out_flushing;
2649             res = queue->last_query;
2650           } else {
2651             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2652             res = FALSE;
2653           }
2654         } else {
2655           GST_DEBUG_OBJECT (queue,
2656               "refusing query, we are not using the queue");
2657           res = FALSE;
2658         }
2659         GST_QUEUE2_MUTEX_UNLOCK (queue);
2660         gst_queue2_post_buffering (queue);
2661       } else {
2662         res = gst_pad_query_default (pad, parent, query);
2663       }
2664       break;
2665   }
2666   return res;
2667
2668   /* ERRORS */
2669 out_flushing:
2670   {
2671     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2672     GST_QUEUE2_MUTEX_UNLOCK (queue);
2673     return FALSE;
2674   }
2675 }
2676
2677 static gboolean
2678 gst_queue2_is_empty (GstQueue2 * queue)
2679 {
2680   /* never empty on EOS */
2681   if (queue->is_eos)
2682     return FALSE;
2683
2684   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2685     return queue->current->writing_pos <= queue->current->max_reading_pos;
2686   } else {
2687     if (queue->queue.length == 0)
2688       return TRUE;
2689   }
2690
2691   return FALSE;
2692 }
2693
2694 static gboolean
2695 gst_queue2_is_filled (GstQueue2 * queue)
2696 {
2697   gboolean res;
2698
2699   /* always filled on EOS */
2700   if (queue->is_eos)
2701     return TRUE;
2702
2703 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2704     (queue->cur_level.format) >= ((alt_max) ? \
2705       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2706
2707   /* if using a ring buffer we're filled if all ring buffer space is used
2708    * _by the current range_ */
2709   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2710     guint64 rb_size = queue->ring_buffer_max_size;
2711     GST_DEBUG_OBJECT (queue,
2712         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2713         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2714     return CHECK_FILLED (bytes, rb_size);
2715   }
2716
2717   /* if using file, we're never filled if we don't have EOS */
2718   if (QUEUE_IS_USING_TEMP_FILE (queue))
2719     return FALSE;
2720
2721   /* we are never filled when we have no buffers at all */
2722   if (queue->cur_level.buffers == 0)
2723     return FALSE;
2724
2725   /* we are filled if one of the current levels exceeds the max */
2726   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2727       || CHECK_FILLED (time, 0);
2728
2729   /* if we need to, use the rate estimate to check against the max time we are
2730    * allowed to queue */
2731   if (queue->use_rate_estimate)
2732     res |= CHECK_FILLED (rate_time, 0);
2733
2734 #undef CHECK_FILLED
2735   return res;
2736 }
2737
2738 static GstFlowReturn
2739 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2740     GstMiniObject * item, GstQueue2ItemType item_type)
2741 {
2742   /* we have to lock the queue since we span threads */
2743   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2744   /* when we received EOS, we refuse more data */
2745   if (queue->is_eos)
2746     goto out_eos;
2747   /* when we received unexpected from downstream, refuse more buffers */
2748   if (queue->unexpected)
2749     goto out_unexpected;
2750
2751   /* while we didn't receive the newsegment, we're seeking and we skip data */
2752   if (queue->seeking)
2753     goto out_seeking;
2754
2755   if (!gst_queue2_wait_free_space (queue))
2756     goto out_flushing;
2757
2758   /* put buffer in queue now */
2759   gst_queue2_locked_enqueue (queue, item, item_type);
2760   GST_QUEUE2_MUTEX_UNLOCK (queue);
2761   gst_queue2_post_buffering (queue);
2762
2763   return GST_FLOW_OK;
2764
2765   /* special conditions */
2766 out_flushing:
2767   {
2768     GstFlowReturn ret = queue->sinkresult;
2769
2770     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2771         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2772     GST_QUEUE2_MUTEX_UNLOCK (queue);
2773     gst_mini_object_unref (item);
2774
2775     return ret;
2776   }
2777 out_eos:
2778   {
2779     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2780     GST_QUEUE2_MUTEX_UNLOCK (queue);
2781     gst_mini_object_unref (item);
2782
2783     return GST_FLOW_EOS;
2784   }
2785 out_seeking:
2786   {
2787     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2788     GST_QUEUE2_MUTEX_UNLOCK (queue);
2789     gst_mini_object_unref (item);
2790
2791     return GST_FLOW_OK;
2792   }
2793 out_unexpected:
2794   {
2795     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2796     GST_QUEUE2_MUTEX_UNLOCK (queue);
2797     gst_mini_object_unref (item);
2798
2799     return GST_FLOW_EOS;
2800   }
2801 }
2802
2803 static GstFlowReturn
2804 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2805 {
2806   GstQueue2 *queue;
2807
2808   queue = GST_QUEUE2 (parent);
2809
2810   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2811       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2812       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2813       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2814       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2815
2816   return gst_queue2_chain_buffer_or_buffer_list (queue,
2817       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2818 }
2819
2820 static GstFlowReturn
2821 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2822     GstBufferList * buffer_list)
2823 {
2824   GstQueue2 *queue;
2825
2826   queue = GST_QUEUE2 (parent);
2827
2828   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2829       "received buffer list %p", buffer_list);
2830
2831   return gst_queue2_chain_buffer_or_buffer_list (queue,
2832       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2833 }
2834
2835 static GstMiniObject *
2836 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2837 {
2838   GstMiniObject *data;
2839
2840   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2841
2842   /* stop pushing buffers, we dequeue all items until we see an item that we
2843    * can push again, which is EOS or SEGMENT. If there is nothing in the
2844    * queue we can push, we set a flag to make the sinkpad refuse more
2845    * buffers with an EOS return value until we receive something
2846    * pushable again or we get flushed. */
2847   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2848     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2849       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2850           "dropping EOS buffer %p", data);
2851       gst_buffer_unref (GST_BUFFER_CAST (data));
2852     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2853       GstEvent *event = GST_EVENT_CAST (data);
2854       GstEventType type = GST_EVENT_TYPE (event);
2855
2856       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2857         /* we found a pushable item in the queue, push it out */
2858         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2859             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2860         return data;
2861       }
2862       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2863           "dropping EOS event %p", event);
2864       gst_event_unref (event);
2865     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2866       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2867           "dropping EOS buffer list %p", data);
2868       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2869     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2870       queue->last_query = FALSE;
2871       g_cond_signal (&queue->query_handled);
2872       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2873     }
2874   }
2875   /* no more items in the queue. Set the unexpected flag so that upstream
2876    * make us refuse any more buffers on the sinkpad. Since we will still
2877    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2878    * task function does not shut down. */
2879   queue->unexpected = TRUE;
2880   return NULL;
2881 }
2882
2883 /* dequeue an item from the queue an push it downstream. This functions returns
2884  * the result of the push. */
2885 static GstFlowReturn
2886 gst_queue2_push_one (GstQueue2 * queue)
2887 {
2888   GstFlowReturn result = queue->srcresult;
2889   GstMiniObject *data;
2890   GstQueue2ItemType item_type;
2891
2892   data = gst_queue2_locked_dequeue (queue, &item_type);
2893   if (data == NULL)
2894     goto no_item;
2895
2896 next:
2897   STATUS (queue, queue->srcpad, "We have something dequeud");
2898   g_atomic_int_set (&queue->downstream_may_block,
2899       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2900       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2901   GST_QUEUE2_MUTEX_UNLOCK (queue);
2902   gst_queue2_post_buffering (queue);
2903
2904   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2905     GstBuffer *buffer;
2906
2907     buffer = GST_BUFFER_CAST (data);
2908
2909     result = gst_pad_push (queue->srcpad, buffer);
2910     g_atomic_int_set (&queue->downstream_may_block, 0);
2911
2912     /* need to check for srcresult here as well */
2913     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2914     if (result == GST_FLOW_EOS) {
2915       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2916       if (data != NULL)
2917         goto next;
2918       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2919        * to the caller so that the task function does not shut down */
2920       result = GST_FLOW_OK;
2921     }
2922   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2923     GstEvent *event = GST_EVENT_CAST (data);
2924     GstEventType type = GST_EVENT_TYPE (event);
2925
2926     if (type == GST_EVENT_TAG) {
2927       if (queue->use_tags_bitrate) {
2928         GstTagList *tags;
2929         guint bitrate;
2930
2931         gst_event_parse_tag (event, &tags);
2932         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2933             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2934           GST_QUEUE2_MUTEX_LOCK (queue);
2935           queue->src_tags_bitrate = bitrate;
2936           GST_QUEUE2_MUTEX_UNLOCK (queue);
2937           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
2938         }
2939       }
2940     }
2941
2942     gst_pad_push_event (queue->srcpad, event);
2943
2944     /* if we're EOS, return EOS so that the task pauses. */
2945     if (type == GST_EVENT_EOS) {
2946       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2947           "pushed EOS event %p, return EOS", event);
2948       result = GST_FLOW_EOS;
2949     }
2950
2951     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2952   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2953     GstBufferList *buffer_list;
2954
2955     buffer_list = GST_BUFFER_LIST_CAST (data);
2956
2957     result = gst_pad_push_list (queue->srcpad, buffer_list);
2958     g_atomic_int_set (&queue->downstream_may_block, 0);
2959
2960     /* need to check for srcresult here as well */
2961     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2962     if (result == GST_FLOW_EOS) {
2963       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2964       if (data != NULL)
2965         goto next;
2966       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2967        * to the caller so that the task function does not shut down */
2968       result = GST_FLOW_OK;
2969     }
2970   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2971     GstQuery *query = GST_QUERY_CAST (data);
2972
2973     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2974     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2975     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2976     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2977         "did query %p, return %d", query, queue->last_query);
2978     g_cond_signal (&queue->query_handled);
2979     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2980     result = GST_FLOW_OK;
2981   }
2982   return result;
2983
2984   /* ERRORS */
2985 no_item:
2986   {
2987     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2988         "exit because we have no item in the queue");
2989     return GST_FLOW_ERROR;
2990   }
2991 out_flushing:
2992   {
2993     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2994     return GST_FLOW_FLUSHING;
2995   }
2996 }
2997
2998 /* called repeatedly with @pad as the source pad. This function should push out
2999  * data to the peer element. */
3000 static void
3001 gst_queue2_loop (GstPad * pad)
3002 {
3003   GstQueue2 *queue;
3004   GstFlowReturn ret;
3005
3006   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
3007
3008   /* have to lock for thread-safety */
3009   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3010
3011   if (gst_queue2_is_empty (queue)) {
3012     gboolean started;
3013
3014     /* pause the timer while we wait. The fact that we are waiting does not mean
3015      * the byterate on the output pad is lower */
3016     if ((started = queue->out_timer_started))
3017       g_timer_stop (queue->out_timer);
3018
3019     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
3020         "queue is empty, waiting for new data");
3021     do {
3022       /* Wait for data to be available, we could be unlocked because of a flush. */
3023       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
3024     }
3025     while (gst_queue2_is_empty (queue));
3026
3027     /* and continue if we were running before */
3028     if (started)
3029       g_timer_continue (queue->out_timer);
3030   }
3031   ret = gst_queue2_push_one (queue);
3032   queue->srcresult = ret;
3033   queue->sinkresult = ret;
3034   if (ret != GST_FLOW_OK)
3035     goto out_flushing;
3036
3037   GST_QUEUE2_MUTEX_UNLOCK (queue);
3038   gst_queue2_post_buffering (queue);
3039
3040   return;
3041
3042   /* ERRORS */
3043 out_flushing:
3044   {
3045     gboolean eos = queue->is_eos;
3046     GstFlowReturn ret = queue->srcresult;
3047
3048     gst_pad_pause_task (queue->srcpad);
3049     if (ret == GST_FLOW_FLUSHING) {
3050       gst_queue2_locked_flush (queue, FALSE, FALSE);
3051     } else {
3052       GST_QUEUE2_SIGNAL_DEL (queue);
3053       queue->last_query = FALSE;
3054       g_cond_signal (&queue->query_handled);
3055     }
3056     GST_QUEUE2_MUTEX_UNLOCK (queue);
3057     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3058         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
3059     /* let app know about us giving up if upstream is not expected to do so */
3060     /* EOS is already taken care of elsewhere */
3061     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3062       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
3063           (_("Internal data flow error.")),
3064           ("streaming task paused, reason %s (%d)",
3065               gst_flow_get_name (ret), ret));
3066       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3067     }
3068     return;
3069   }
3070 }
3071
3072 static gboolean
3073 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3074 {
3075   gboolean res = TRUE;
3076   GstQueue2 *queue = GST_QUEUE2 (parent);
3077
3078 #ifndef GST_DISABLE_GST_DEBUG
3079   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3080       event, GST_EVENT_TYPE_NAME (event));
3081 #endif
3082
3083   switch (GST_EVENT_TYPE (event)) {
3084     case GST_EVENT_FLUSH_START:
3085       if (QUEUE_IS_USING_QUEUE (queue)) {
3086         /* just forward upstream */
3087         res = gst_pad_push_event (queue->sinkpad, event);
3088       } else {
3089         /* now unblock the getrange function */
3090         GST_QUEUE2_MUTEX_LOCK (queue);
3091         GST_DEBUG_OBJECT (queue, "flushing");
3092         queue->srcresult = GST_FLOW_FLUSHING;
3093         GST_QUEUE2_SIGNAL_ADD (queue);
3094         GST_QUEUE2_MUTEX_UNLOCK (queue);
3095
3096         /* when using a temp file, we eat the event */
3097         res = TRUE;
3098         gst_event_unref (event);
3099       }
3100       break;
3101     case GST_EVENT_FLUSH_STOP:
3102       if (QUEUE_IS_USING_QUEUE (queue)) {
3103         /* just forward upstream */
3104         res = gst_pad_push_event (queue->sinkpad, event);
3105       } else {
3106         /* now unblock the getrange function */
3107         GST_QUEUE2_MUTEX_LOCK (queue);
3108         queue->srcresult = GST_FLOW_OK;
3109         GST_QUEUE2_MUTEX_UNLOCK (queue);
3110
3111         /* when using a temp file, we eat the event */
3112         res = TRUE;
3113         gst_event_unref (event);
3114       }
3115       break;
3116     case GST_EVENT_RECONFIGURE:
3117       GST_QUEUE2_MUTEX_LOCK (queue);
3118       /* assume downstream is linked now and try to push again */
3119       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3120         queue->srcresult = GST_FLOW_OK;
3121         queue->sinkresult = GST_FLOW_OK;
3122         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3123           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3124               NULL);
3125         }
3126       }
3127       GST_QUEUE2_MUTEX_UNLOCK (queue);
3128
3129       res = gst_pad_push_event (queue->sinkpad, event);
3130       break;
3131     default:
3132       res = gst_pad_push_event (queue->sinkpad, event);
3133       break;
3134   }
3135
3136   return res;
3137 }
3138
3139 static gboolean
3140 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3141 {
3142   GstQueue2 *queue;
3143
3144   queue = GST_QUEUE2 (parent);
3145
3146   switch (GST_QUERY_TYPE (query)) {
3147     case GST_QUERY_POSITION:
3148     {
3149       gint64 peer_pos;
3150       GstFormat format;
3151
3152       if (!gst_pad_peer_query (queue->sinkpad, query))
3153         goto peer_failed;
3154
3155       /* get peer position */
3156       gst_query_parse_position (query, &format, &peer_pos);
3157
3158       /* FIXME: this code assumes that there's no discont in the queue */
3159       switch (format) {
3160         case GST_FORMAT_BYTES:
3161           peer_pos -= queue->cur_level.bytes;
3162           break;
3163         case GST_FORMAT_TIME:
3164           peer_pos -= queue->cur_level.time;
3165           break;
3166         default:
3167           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3168               "know how to adjust value", gst_format_get_name (format));
3169           return FALSE;
3170       }
3171       /* set updated position */
3172       gst_query_set_position (query, format, peer_pos);
3173       break;
3174     }
3175     case GST_QUERY_DURATION:
3176     {
3177       GST_DEBUG_OBJECT (queue, "doing peer query");
3178
3179       if (!gst_pad_peer_query (queue->sinkpad, query))
3180         goto peer_failed;
3181
3182       GST_DEBUG_OBJECT (queue, "peer query success");
3183       break;
3184     }
3185     case GST_QUERY_BUFFERING:
3186     {
3187       gint percent;
3188       gboolean is_buffering;
3189       GstBufferingMode mode;
3190       gint avg_in, avg_out;
3191       gint64 buffering_left;
3192
3193       GST_DEBUG_OBJECT (queue, "query buffering");
3194
3195       get_buffering_level (queue, &is_buffering, &percent);
3196       percent = convert_to_buffering_percent (queue, percent);
3197       gst_query_set_buffering_percent (query, is_buffering, percent);
3198
3199       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3200           &buffering_left);
3201       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3202           buffering_left);
3203
3204       if (!QUEUE_IS_USING_QUEUE (queue)) {
3205         /* add ranges for download and ringbuffer buffering */
3206         GstFormat format;
3207         gint64 start, stop, range_start, range_stop;
3208         guint64 writing_pos;
3209         gint64 estimated_total;
3210         gint64 duration;
3211         gboolean peer_res, is_eos;
3212         GstQueue2Range *queued_ranges;
3213
3214         /* we need a current download region */
3215         if (queue->current == NULL)
3216           return FALSE;
3217
3218         writing_pos = queue->current->writing_pos;
3219         is_eos = queue->is_eos;
3220
3221         if (is_eos) {
3222           /* we're EOS, we know the duration in bytes now */
3223           peer_res = TRUE;
3224           duration = writing_pos;
3225         } else {
3226           /* get duration of upstream in bytes */
3227           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3228               GST_FORMAT_BYTES, &duration);
3229         }
3230
3231         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3232             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3233
3234         /* calculate remaining and total download time */
3235         if (peer_res && avg_in > 0.0)
3236           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3237         else
3238           estimated_total = -1;
3239
3240         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3241             estimated_total);
3242
3243         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3244
3245         switch (format) {
3246           case GST_FORMAT_PERCENT:
3247             /* we need duration */
3248             if (!peer_res)
3249               goto peer_failed;
3250
3251             start = 0;
3252             /* get our available data relative to the duration */
3253             if (duration != -1)
3254               stop =
3255                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3256                   duration);
3257             else
3258               stop = -1;
3259             break;
3260           case GST_FORMAT_BYTES:
3261             start = 0;
3262             stop = writing_pos;
3263             break;
3264           default:
3265             start = -1;
3266             stop = -1;
3267             break;
3268         }
3269
3270         /* fill out the buffered ranges */
3271         for (queued_ranges = queue->ranges; queued_ranges;
3272             queued_ranges = queued_ranges->next) {
3273           switch (format) {
3274             case GST_FORMAT_PERCENT:
3275               if (duration == -1) {
3276                 range_start = 0;
3277                 range_stop = 0;
3278                 break;
3279               }
3280               range_start =
3281                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3282                   queued_ranges->offset, duration);
3283               range_stop =
3284                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3285                   queued_ranges->writing_pos, duration);
3286               break;
3287             case GST_FORMAT_BYTES:
3288               range_start = queued_ranges->offset;
3289               range_stop = queued_ranges->writing_pos;
3290               break;
3291             default:
3292               range_start = -1;
3293               range_stop = -1;
3294               break;
3295           }
3296           if (range_start == range_stop)
3297             continue;
3298           GST_DEBUG_OBJECT (queue,
3299               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3300               G_GINT64_FORMAT, range_start, range_stop);
3301           gst_query_add_buffering_range (query, range_start, range_stop);
3302         }
3303
3304         gst_query_set_buffering_range (query, format, start, stop,
3305             estimated_total);
3306       }
3307       break;
3308     }
3309     case GST_QUERY_SCHEDULING:
3310     {
3311       gboolean pull_mode;
3312       GstSchedulingFlags flags = 0;
3313
3314       if (!gst_pad_peer_query (queue->sinkpad, query))
3315         goto peer_failed;
3316
3317       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3318
3319       /* we can operate in pull mode when we are using a tempfile */
3320       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3321
3322       if (pull_mode)
3323         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3324       gst_query_set_scheduling (query, flags, 0, -1, 0);
3325       if (pull_mode)
3326         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3327       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3328       break;
3329     }
3330     default:
3331       /* peer handled other queries */
3332       if (!gst_pad_query_default (pad, parent, query))
3333         goto peer_failed;
3334       break;
3335   }
3336
3337   return TRUE;
3338
3339   /* ERRORS */
3340 peer_failed:
3341   {
3342     GST_DEBUG_OBJECT (queue, "failed peer query");
3343     return FALSE;
3344   }
3345 }
3346
3347 static gboolean
3348 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3349 {
3350   GstQueue2 *queue = GST_QUEUE2 (element);
3351
3352   /* simply forward to the srcpad query function */
3353   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3354       query);
3355 }
3356
3357 static void
3358 gst_queue2_update_upstream_size (GstQueue2 * queue)
3359 {
3360   gint64 upstream_size = -1;
3361
3362   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3363           &upstream_size)) {
3364     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3365
3366     /* upstream_size can be negative but queue->upstream_size is unsigned.
3367      * Prevent setting negative values to it (the query can return -1) */
3368     if (upstream_size >= 0)
3369       queue->upstream_size = upstream_size;
3370     else
3371       queue->upstream_size = 0;
3372   }
3373 }
3374
3375 static GstFlowReturn
3376 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3377     guint length, GstBuffer ** buffer)
3378 {
3379   GstQueue2 *queue;
3380   GstFlowReturn ret;
3381
3382   queue = GST_QUEUE2_CAST (parent);
3383
3384   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3385   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3386   offset = (offset == -1) ? queue->current->reading_pos : offset;
3387
3388   GST_DEBUG_OBJECT (queue,
3389       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3390
3391   /* catch any reads beyond the size of the file here to make sure queue2
3392    * doesn't send seek events beyond the size of the file upstream, since
3393    * that would confuse elements such as souphttpsrc and/or http servers.
3394    * Demuxers often just loop until EOS at the end of the file to figure out
3395    * when they've read all the end-headers or index chunks. */
3396   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3397     gst_queue2_update_upstream_size (queue);
3398     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3399       goto out_unexpected;
3400   }
3401
3402   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3403     gst_queue2_update_upstream_size (queue);
3404     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3405       length = queue->upstream_size - offset;
3406       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3407     }
3408   }
3409
3410   /* FIXME - function will block when the range is not yet available */
3411   ret = gst_queue2_create_read (queue, offset, length, buffer);
3412   GST_QUEUE2_MUTEX_UNLOCK (queue);
3413   gst_queue2_post_buffering (queue);
3414
3415   return ret;
3416
3417   /* ERRORS */
3418 out_flushing:
3419   {
3420     ret = queue->srcresult;
3421
3422     GST_DEBUG_OBJECT (queue, "we are flushing");
3423     GST_QUEUE2_MUTEX_UNLOCK (queue);
3424     return ret;
3425   }
3426 out_unexpected:
3427   {
3428     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3429     GST_QUEUE2_MUTEX_UNLOCK (queue);
3430     return GST_FLOW_EOS;
3431   }
3432 }
3433
3434 /* sink currently only operates in push mode */
3435 static gboolean
3436 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3437     GstPadMode mode, gboolean active)
3438 {
3439   gboolean result;
3440   GstQueue2 *queue;
3441
3442   queue = GST_QUEUE2 (parent);
3443
3444   switch (mode) {
3445     case GST_PAD_MODE_PUSH:
3446       if (active) {
3447         GST_QUEUE2_MUTEX_LOCK (queue);
3448         GST_DEBUG_OBJECT (queue, "activating push mode");
3449         queue->srcresult = GST_FLOW_OK;
3450         queue->sinkresult = GST_FLOW_OK;
3451         queue->is_eos = FALSE;
3452         queue->unexpected = FALSE;
3453         reset_rate_timer (queue);
3454         GST_QUEUE2_MUTEX_UNLOCK (queue);
3455       } else {
3456         /* unblock chain function */
3457         GST_QUEUE2_MUTEX_LOCK (queue);
3458         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3459         queue->srcresult = GST_FLOW_FLUSHING;
3460         queue->sinkresult = GST_FLOW_FLUSHING;
3461         GST_QUEUE2_SIGNAL_DEL (queue);
3462         GST_QUEUE2_MUTEX_UNLOCK (queue);
3463
3464         /* wait until it is unblocked and clean up */
3465         GST_PAD_STREAM_LOCK (pad);
3466         GST_QUEUE2_MUTEX_LOCK (queue);
3467         gst_queue2_locked_flush (queue, TRUE, FALSE);
3468         GST_QUEUE2_MUTEX_UNLOCK (queue);
3469         GST_PAD_STREAM_UNLOCK (pad);
3470       }
3471       result = TRUE;
3472       break;
3473     default:
3474       result = FALSE;
3475       break;
3476   }
3477   return result;
3478 }
3479
3480 /* src operating in push mode, we start a task on the source pad that pushes out
3481  * buffers from the queue */
3482 static gboolean
3483 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3484 {
3485   gboolean result = FALSE;
3486   GstQueue2 *queue;
3487
3488   queue = GST_QUEUE2 (parent);
3489
3490   if (active) {
3491     GST_QUEUE2_MUTEX_LOCK (queue);
3492     GST_DEBUG_OBJECT (queue, "activating push mode");
3493     queue->srcresult = GST_FLOW_OK;
3494     queue->sinkresult = GST_FLOW_OK;
3495     queue->is_eos = FALSE;
3496     queue->unexpected = FALSE;
3497     result =
3498         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3499     GST_QUEUE2_MUTEX_UNLOCK (queue);
3500   } else {
3501     /* unblock loop function */
3502     GST_QUEUE2_MUTEX_LOCK (queue);
3503     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3504     queue->srcresult = GST_FLOW_FLUSHING;
3505     queue->sinkresult = GST_FLOW_FLUSHING;
3506     /* the item add signal will unblock */
3507     GST_QUEUE2_SIGNAL_ADD (queue);
3508     GST_QUEUE2_MUTEX_UNLOCK (queue);
3509
3510     /* step 2, make sure streaming finishes */
3511     result = gst_pad_stop_task (pad);
3512   }
3513
3514   return result;
3515 }
3516
3517 /* pull mode, downstream will call our getrange function */
3518 static gboolean
3519 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3520 {
3521   gboolean result;
3522   GstQueue2 *queue;
3523
3524   queue = GST_QUEUE2 (parent);
3525
3526   if (active) {
3527     GST_QUEUE2_MUTEX_LOCK (queue);
3528     if (!QUEUE_IS_USING_QUEUE (queue)) {
3529       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3530         /* open the temp file now */
3531         result = gst_queue2_open_temp_location_file (queue);
3532       } else if (!queue->ring_buffer) {
3533         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3534         result = ! !queue->ring_buffer;
3535       } else {
3536         result = TRUE;
3537       }
3538
3539       GST_DEBUG_OBJECT (queue, "activating pull mode");
3540       init_ranges (queue);
3541       queue->srcresult = GST_FLOW_OK;
3542       queue->sinkresult = GST_FLOW_OK;
3543       queue->is_eos = FALSE;
3544       queue->unexpected = FALSE;
3545       queue->upstream_size = 0;
3546     } else {
3547       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3548       /* this is not allowed, we cannot operate in pull mode without a temp
3549        * file. */
3550       queue->srcresult = GST_FLOW_FLUSHING;
3551       queue->sinkresult = GST_FLOW_FLUSHING;
3552       result = FALSE;
3553     }
3554     GST_QUEUE2_MUTEX_UNLOCK (queue);
3555   } else {
3556     GST_QUEUE2_MUTEX_LOCK (queue);
3557     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3558     queue->srcresult = GST_FLOW_FLUSHING;
3559     queue->sinkresult = GST_FLOW_FLUSHING;
3560     /* this will unlock getrange */
3561     GST_QUEUE2_SIGNAL_ADD (queue);
3562     result = TRUE;
3563     GST_QUEUE2_MUTEX_UNLOCK (queue);
3564   }
3565
3566   return result;
3567 }
3568
3569 static gboolean
3570 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3571     gboolean active)
3572 {
3573   gboolean res;
3574
3575   switch (mode) {
3576     case GST_PAD_MODE_PULL:
3577       res = gst_queue2_src_activate_pull (pad, parent, active);
3578       break;
3579     case GST_PAD_MODE_PUSH:
3580       res = gst_queue2_src_activate_push (pad, parent, active);
3581       break;
3582     default:
3583       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3584       res = FALSE;
3585       break;
3586   }
3587   return res;
3588 }
3589
3590 static GstStateChangeReturn
3591 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3592 {
3593   GstQueue2 *queue;
3594   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3595
3596   queue = GST_QUEUE2 (element);
3597
3598   switch (transition) {
3599     case GST_STATE_CHANGE_NULL_TO_READY:
3600       break;
3601     case GST_STATE_CHANGE_READY_TO_PAUSED:
3602       GST_QUEUE2_MUTEX_LOCK (queue);
3603       if (!QUEUE_IS_USING_QUEUE (queue)) {
3604         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3605           if (!gst_queue2_open_temp_location_file (queue))
3606             ret = GST_STATE_CHANGE_FAILURE;
3607         } else {
3608           if (queue->ring_buffer) {
3609             g_free (queue->ring_buffer);
3610             queue->ring_buffer = NULL;
3611           }
3612           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3613             ret = GST_STATE_CHANGE_FAILURE;
3614         }
3615         init_ranges (queue);
3616       }
3617       queue->segment_event_received = FALSE;
3618       queue->starting_segment = NULL;
3619       gst_event_replace (&queue->stream_start_event, NULL);
3620       GST_QUEUE2_MUTEX_UNLOCK (queue);
3621       break;
3622     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3623       break;
3624     default:
3625       break;
3626   }
3627
3628   if (ret == GST_STATE_CHANGE_FAILURE)
3629     return ret;
3630
3631   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3632
3633   if (ret == GST_STATE_CHANGE_FAILURE)
3634     return ret;
3635
3636   switch (transition) {
3637     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3638       break;
3639     case GST_STATE_CHANGE_PAUSED_TO_READY:
3640       GST_QUEUE2_MUTEX_LOCK (queue);
3641       if (!QUEUE_IS_USING_QUEUE (queue)) {
3642         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3643           gst_queue2_close_temp_location_file (queue);
3644         } else if (queue->ring_buffer) {
3645           g_free (queue->ring_buffer);
3646           queue->ring_buffer = NULL;
3647         }
3648         clean_ranges (queue);
3649       }
3650       if (queue->starting_segment != NULL) {
3651         gst_event_unref (queue->starting_segment);
3652         queue->starting_segment = NULL;
3653       }
3654       gst_event_replace (&queue->stream_start_event, NULL);
3655       GST_QUEUE2_MUTEX_UNLOCK (queue);
3656       break;
3657     case GST_STATE_CHANGE_READY_TO_NULL:
3658       break;
3659     default:
3660       break;
3661   }
3662
3663   return ret;
3664 }
3665
3666 /* changing the capacity of the queue must wake up
3667  * the _chain function, it might have more room now
3668  * to store the buffer/event in the queue */
3669 #define QUEUE_CAPACITY_CHANGE(q) \
3670   GST_QUEUE2_SIGNAL_DEL (queue); \
3671   if (queue->use_buffering)      \
3672     update_buffering (queue);
3673
3674 /* Changing the minimum required fill level must
3675  * wake up the _loop function as it might now
3676  * be able to preceed.
3677  */
3678 #define QUEUE_THRESHOLD_CHANGE(q)\
3679   GST_QUEUE2_SIGNAL_ADD (queue);
3680
3681 static void
3682 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3683 {
3684   GstState state;
3685
3686   /* the element must be stopped in order to do this */
3687   GST_OBJECT_LOCK (queue);
3688   state = GST_STATE (queue);
3689   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3690     goto wrong_state;
3691   GST_OBJECT_UNLOCK (queue);
3692
3693   /* set new location */
3694   g_free (queue->temp_template);
3695   queue->temp_template = g_strdup (template);
3696
3697   return;
3698
3699 /* ERROR */
3700 wrong_state:
3701   {
3702     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3703     GST_OBJECT_UNLOCK (queue);
3704   }
3705 }
3706
3707 static void
3708 gst_queue2_set_property (GObject * object,
3709     guint prop_id, const GValue * value, GParamSpec * pspec)
3710 {
3711   GstQueue2 *queue = GST_QUEUE2 (object);
3712
3713   /* someone could change levels here, and since this
3714    * affects the get/put funcs, we need to lock for safety. */
3715   GST_QUEUE2_MUTEX_LOCK (queue);
3716
3717   switch (prop_id) {
3718     case PROP_MAX_SIZE_BYTES:
3719       queue->max_level.bytes = g_value_get_uint (value);
3720       QUEUE_CAPACITY_CHANGE (queue);
3721       break;
3722     case PROP_MAX_SIZE_BUFFERS:
3723       queue->max_level.buffers = g_value_get_uint (value);
3724       QUEUE_CAPACITY_CHANGE (queue);
3725       break;
3726     case PROP_MAX_SIZE_TIME:
3727       queue->max_level.time = g_value_get_uint64 (value);
3728       /* set rate_time to the same value. We use an extra field in the level
3729        * structure so that we can easily access and compare it */
3730       queue->max_level.rate_time = queue->max_level.time;
3731       QUEUE_CAPACITY_CHANGE (queue);
3732       break;
3733     case PROP_USE_BUFFERING:
3734       queue->use_buffering = g_value_get_boolean (value);
3735       if (!queue->use_buffering && queue->is_buffering) {
3736         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3737             "posting 100%% message");
3738         SET_PERCENT (queue, 100);
3739         queue->is_buffering = FALSE;
3740       }
3741
3742       if (queue->use_buffering) {
3743         queue->is_buffering = TRUE;
3744         update_buffering (queue);
3745       }
3746       break;
3747     case PROP_USE_TAGS_BITRATE:
3748       queue->use_tags_bitrate = g_value_get_boolean (value);
3749       break;
3750     case PROP_USE_RATE_ESTIMATE:
3751       queue->use_rate_estimate = g_value_get_boolean (value);
3752       break;
3753     case PROP_LOW_PERCENT:
3754       queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3755       break;
3756     case PROP_HIGH_PERCENT:
3757       queue->high_watermark =
3758           g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3759       break;
3760     case PROP_LOW_WATERMARK:
3761       queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3762       break;
3763     case PROP_HIGH_WATERMARK:
3764       queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3765       break;
3766     case PROP_TEMP_TEMPLATE:
3767       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3768       break;
3769     case PROP_TEMP_REMOVE:
3770       queue->temp_remove = g_value_get_boolean (value);
3771       break;
3772     case PROP_RING_BUFFER_MAX_SIZE:
3773       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3774       break;
3775     default:
3776       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3777       break;
3778   }
3779
3780   GST_QUEUE2_MUTEX_UNLOCK (queue);
3781   gst_queue2_post_buffering (queue);
3782 }
3783
3784 static void
3785 gst_queue2_get_property (GObject * object,
3786     guint prop_id, GValue * value, GParamSpec * pspec)
3787 {
3788   GstQueue2 *queue = GST_QUEUE2 (object);
3789
3790   GST_QUEUE2_MUTEX_LOCK (queue);
3791
3792   switch (prop_id) {
3793     case PROP_CUR_LEVEL_BYTES:
3794       g_value_set_uint (value, queue->cur_level.bytes);
3795       break;
3796     case PROP_CUR_LEVEL_BUFFERS:
3797       g_value_set_uint (value, queue->cur_level.buffers);
3798       break;
3799     case PROP_CUR_LEVEL_TIME:
3800       g_value_set_uint64 (value, queue->cur_level.time);
3801       break;
3802     case PROP_MAX_SIZE_BYTES:
3803       g_value_set_uint (value, queue->max_level.bytes);
3804       break;
3805     case PROP_MAX_SIZE_BUFFERS:
3806       g_value_set_uint (value, queue->max_level.buffers);
3807       break;
3808     case PROP_MAX_SIZE_TIME:
3809       g_value_set_uint64 (value, queue->max_level.time);
3810       break;
3811     case PROP_USE_BUFFERING:
3812       g_value_set_boolean (value, queue->use_buffering);
3813       break;
3814     case PROP_USE_TAGS_BITRATE:
3815       g_value_set_boolean (value, queue->use_tags_bitrate);
3816       break;
3817     case PROP_USE_RATE_ESTIMATE:
3818       g_value_set_boolean (value, queue->use_rate_estimate);
3819       break;
3820     case PROP_LOW_PERCENT:
3821       g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
3822       break;
3823     case PROP_HIGH_PERCENT:
3824       g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
3825       break;
3826     case PROP_LOW_WATERMARK:
3827       g_value_set_double (value, queue->low_watermark /
3828           (gdouble) MAX_BUFFERING_LEVEL);
3829       break;
3830     case PROP_HIGH_WATERMARK:
3831       g_value_set_double (value, queue->high_watermark /
3832           (gdouble) MAX_BUFFERING_LEVEL);
3833       break;
3834     case PROP_TEMP_TEMPLATE:
3835       g_value_set_string (value, queue->temp_template);
3836       break;
3837     case PROP_TEMP_LOCATION:
3838       g_value_set_string (value, queue->temp_location);
3839       break;
3840     case PROP_TEMP_REMOVE:
3841       g_value_set_boolean (value, queue->temp_remove);
3842       break;
3843     case PROP_RING_BUFFER_MAX_SIZE:
3844       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3845       break;
3846     case PROP_AVG_IN_RATE:
3847     {
3848       gdouble in_rate = queue->byte_in_rate;
3849
3850       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3851        * calculated, so calculate it here. */
3852       if (in_rate == 0.0 && queue->bytes_in
3853           && queue->last_update_in_rates_elapsed > 0.0)
3854         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3855
3856       g_value_set_int64 (value, (gint64) in_rate);
3857       break;
3858     }
3859     default:
3860       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3861       break;
3862   }
3863
3864   GST_QUEUE2_MUTEX_UNLOCK (queue);
3865 }