queue2: use GstQueueArray
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  * @title: queue2
29  *
30  * Data is queued until one of the limits specified by the
31  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
32  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
33  * more buffers into the queue will block the pushing thread until more space
34  * becomes available.
35  *
36  * The queue will create a new thread on the source pad to decouple the
37  * processing on sink and source pad.
38  *
39  * You can query how many buffers are queued by reading the
40  * #GstQueue2:current-level-buffers property.
41  *
42  * The default queue size limits are 100 buffers, 2MB of data, or
43  * two seconds worth of data, whichever is reached first.
44  *
45  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
46  * will allocate a random free filename and buffer data in the file.
47  * By using this, it will buffer the entire stream data on the file independently
48  * of the queue size limits, they will only be used for buffering statistics.
49  *
50  * The temp-location property will be used to notify the application of the
51  * allocated filename.
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include "gstqueue2.h"
59
60 #include <glib/gstdio.h>
61
62 #include "gst/gst-i18n-lib.h"
63 #include "gst/glib-compat-private.h"
64
65 #include <string.h>
66
67 #ifdef G_OS_WIN32
68 #include <io.h>                 /* lseek, open, close, read */
69 #undef lseek
70 #define lseek _lseeki64
71 #undef off_t
72 #define off_t guint64
73 #else
74 #include <unistd.h>
75 #endif
76
77 #ifdef __BIONIC__               /* Android */
78 #include <fcntl.h>
79 #endif
80
81 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
82     GST_PAD_SINK,
83     GST_PAD_ALWAYS,
84     GST_STATIC_CAPS_ANY);
85
86 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
87     GST_PAD_SRC,
88     GST_PAD_ALWAYS,
89     GST_STATIC_CAPS_ANY);
90
91 GST_DEBUG_CATEGORY_STATIC (queue_debug);
92 #define GST_CAT_DEFAULT (queue_debug)
93 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
94
95 enum
96 {
97   LAST_SIGNAL
98 };
99
100 /* other defines */
101 #define DEFAULT_BUFFER_SIZE 4096
102 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
103 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
104 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
105
106 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
107
108 /* default property values */
109 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
110 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
111 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
112 #define DEFAULT_USE_BUFFERING      FALSE
113 #define DEFAULT_USE_TAGS_BITRATE   FALSE
114 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
115 #define DEFAULT_LOW_PERCENT        10
116 #define DEFAULT_HIGH_PERCENT       99
117 #define DEFAULT_LOW_WATERMARK      0.01
118 #define DEFAULT_HIGH_WATERMARK     0.99
119 #define DEFAULT_TEMP_REMOVE        TRUE
120 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
121
122 enum
123 {
124   PROP_0,
125   PROP_CUR_LEVEL_BUFFERS,
126   PROP_CUR_LEVEL_BYTES,
127   PROP_CUR_LEVEL_TIME,
128   PROP_MAX_SIZE_BUFFERS,
129   PROP_MAX_SIZE_BYTES,
130   PROP_MAX_SIZE_TIME,
131   PROP_USE_BUFFERING,
132   PROP_USE_TAGS_BITRATE,
133   PROP_USE_RATE_ESTIMATE,
134   PROP_LOW_PERCENT,
135   PROP_HIGH_PERCENT,
136   PROP_LOW_WATERMARK,
137   PROP_HIGH_WATERMARK,
138   PROP_TEMP_TEMPLATE,
139   PROP_TEMP_LOCATION,
140   PROP_TEMP_REMOVE,
141   PROP_RING_BUFFER_MAX_SIZE,
142   PROP_AVG_IN_RATE,
143   PROP_LAST
144 };
145
146 /* Explanation for buffer levels and percentages:
147  *
148  * The buffering_level functions here return a value in a normalized range
149  * that specifies the queue's current fill level. The range goes from 0 to
150  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
151  *
152  * This is not to be confused with the buffering_percent value, which is
153  * a *relative* quantity - relative to the low/high watermarks.
154  * buffering_percent = 0% means buffering_level is at the low watermark.
155  * buffering_percent = 100% means buffering_level is at the high watermark.
156  * buffering_percent is used for determining if the fill level has reached
157  * the high watermark, and for producing BUFFERING messages. This value
158  * always uses a 0..100 range (since it is a percentage).
159  *
160  * To avoid future confusions, whenever "buffering level" is mentioned, it
161  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
162  * range. Whenever "buffering_percent" is mentioned, it refers to the
163  * percentage value that is relative to the low/high watermark. */
164
165 /* Using a buffering level range of 0..1000000 to allow for a
166  * resolution in ppm (1 ppm = 0.0001%) */
167 #define MAX_BUFFERING_LEVEL 1000000
168
169 /* How much 1% makes up in the buffer level range */
170 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
171
172 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
173   l.buffers = 0;                                        \
174   l.bytes = 0;                                          \
175   l.time = 0;                                           \
176   l.rate_time = 0;                                      \
177 } G_STMT_END
178
179 #define STATUS(queue, pad, msg) \
180   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
181                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
182                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
183                       " ns, %"G_GUINT64_FORMAT" items", \
184                       GST_DEBUG_PAD_NAME (pad), \
185                       queue->cur_level.buffers, \
186                       queue->max_level.buffers, \
187                       queue->cur_level.bytes, \
188                       queue->max_level.bytes, \
189                       queue->cur_level.time, \
190                       queue->max_level.time, \
191                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
192                         queue->current->writing_pos - queue->current->max_reading_pos : \
193                         gst_queue_array_get_length(queue->queue)))
194
195 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
196   g_mutex_lock (&q->qlock);                                              \
197 } G_STMT_END
198
199 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
200   GST_QUEUE2_MUTEX_LOCK (q);                                            \
201   if (res != GST_FLOW_OK)                                               \
202     goto label;                                                         \
203 } G_STMT_END
204
205 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
206   g_mutex_unlock (&q->qlock);                                            \
207 } G_STMT_END
208
209 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
210   STATUS (queue, q->sinkpad, "wait for DEL");                           \
211   q->waiting_del = TRUE;                                                \
212   g_cond_wait (&q->item_del, &queue->qlock);                              \
213   q->waiting_del = FALSE;                                               \
214   if (res != GST_FLOW_OK) {                                             \
215     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
216     goto label;                                                         \
217   }                                                                     \
218   STATUS (queue, q->sinkpad, "received DEL");                           \
219 } G_STMT_END
220
221 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
222   STATUS (queue, q->srcpad, "wait for ADD");                            \
223   q->waiting_add = TRUE;                                                \
224   g_cond_wait (&q->item_add, &q->qlock);                                  \
225   q->waiting_add = FALSE;                                               \
226   if (res != GST_FLOW_OK) {                                             \
227     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
228     goto label;                                                         \
229   }                                                                     \
230   STATUS (queue, q->srcpad, "received ADD");                            \
231 } G_STMT_END
232
233 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
234   if (q->waiting_del) {                                                 \
235     STATUS (q, q->srcpad, "signal DEL");                                \
236     g_cond_signal (&q->item_del);                                        \
237   }                                                                     \
238 } G_STMT_END
239
240 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
241   if (q->waiting_add) {                                                 \
242     STATUS (q, q->sinkpad, "signal ADD");                               \
243     g_cond_signal (&q->item_add);                                        \
244   }                                                                     \
245 } G_STMT_END
246
247 #define SET_PERCENT(q, perc) G_STMT_START {                              \
248   if (perc != q->buffering_percent) {                                    \
249     q->buffering_percent = perc;                                         \
250     q->percent_changed = TRUE;                                           \
251     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
252     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
253         &q->buffering_left);                                             \
254   }                                                                      \
255 } G_STMT_END
256
257 #define _do_init \
258     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
259     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
260         "dataflow inside the queue element");
261 #define gst_queue2_parent_class parent_class
262 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
263
264 static void gst_queue2_finalize (GObject * object);
265
266 static void gst_queue2_set_property (GObject * object,
267     guint prop_id, const GValue * value, GParamSpec * pspec);
268 static void gst_queue2_get_property (GObject * object,
269     guint prop_id, GValue * value, GParamSpec * pspec);
270
271 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
272     GstBuffer * buffer);
273 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
274     GstBufferList * buffer_list);
275 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
276 static void gst_queue2_loop (GstPad * pad);
277
278 static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad,
279     GstObject * parent, GstEvent * event);
280 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
281     GstQuery * query);
282
283 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
284     GstEvent * event);
285 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
286     GstQuery * query);
287 static gboolean gst_queue2_handle_query (GstElement * element,
288     GstQuery * query);
289
290 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
291     guint64 offset, guint length, GstBuffer ** buffer);
292
293 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
294     GstPadMode mode, gboolean active);
295 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
296     GstPadMode mode, gboolean active);
297 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
298     GstStateChange transition);
299
300 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
301 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
302
303 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
304 static void update_in_rates (GstQueue2 * queue, gboolean force);
305 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
306 static void gst_queue2_post_buffering (GstQueue2 * queue);
307
308 typedef enum
309 {
310   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
311   GST_QUEUE2_ITEM_TYPE_BUFFER,
312   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
313   GST_QUEUE2_ITEM_TYPE_EVENT,
314   GST_QUEUE2_ITEM_TYPE_QUERY
315 } GstQueue2ItemType;
316
317 typedef struct
318 {
319   GstQueue2ItemType type;
320   GstMiniObject *item;
321 } GstQueue2Item;
322
323 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
324
325 static void
326 gst_queue2_class_init (GstQueue2Class * klass)
327 {
328   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
329   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
330
331   gobject_class->set_property = gst_queue2_set_property;
332   gobject_class->get_property = gst_queue2_get_property;
333
334   /* properties */
335   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
336       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
337           "Current amount of data in the queue (bytes)",
338           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
339   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
340       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
341           "Current number of buffers in the queue",
342           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
343   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
344       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
345           "Current amount of data in the queue (in ns)",
346           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
347
348   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
349       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
350           "Max. amount of data in the queue (bytes, 0=disable)",
351           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
352           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
353           G_PARAM_STATIC_STRINGS));
354   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
355       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
356           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
357           DEFAULT_MAX_SIZE_BUFFERS,
358           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
359           G_PARAM_STATIC_STRINGS));
360   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
361       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
362           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
363           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
364           G_PARAM_STATIC_STRINGS));
365
366   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
367       g_param_spec_boolean ("use-buffering", "Use buffering",
368           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
369           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
370           G_PARAM_STATIC_STRINGS));
371   g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
372       g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
373           "Use a bitrate from upstream tags to estimate buffer duration if not provided",
374           DEFAULT_USE_TAGS_BITRATE,
375           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
376           G_PARAM_STATIC_STRINGS));
377   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
378       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
379           "Estimate the bitrate of the stream to calculate time level",
380           DEFAULT_USE_RATE_ESTIMATE,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
383       g_param_spec_int ("low-percent", "Low percent",
384           "Low threshold for buffering to start. Only used if use-buffering is True "
385           "(Deprecated: use low-watermark instead)",
386           0, 100, DEFAULT_LOW_WATERMARK * 100,
387           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
389       g_param_spec_int ("high-percent", "High percent",
390           "High threshold for buffering to finish. Only used if use-buffering is True "
391           "(Deprecated: use high-watermark instead)",
392           0, 100, DEFAULT_HIGH_WATERMARK * 100,
393           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
395       g_param_spec_double ("low-watermark", "Low watermark",
396           "Low threshold for buffering to start. Only used if use-buffering is True",
397           0.0, 1.0, DEFAULT_LOW_WATERMARK,
398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
400       g_param_spec_double ("high-watermark", "High watermark",
401           "High threshold for buffering to finish. Only used if use-buffering is True",
402           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
403           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404
405   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
406       g_param_spec_string ("temp-template", "Temporary File Template",
407           "File template to store temporary files in, should contain directory "
408           "and XXXXXX. (NULL == disabled)",
409           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410
411   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
412       g_param_spec_string ("temp-location", "Temporary File Location",
413           "Location to store temporary files in (Only read this property, "
414           "use temp-template to configure the name template)",
415           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
416
417   /**
418    * GstQueue2:temp-remove
419    *
420    * When temp-template is set, remove the temporary file when going to READY.
421    */
422   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
423       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
424           "Remove the temp-location after use",
425           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
426
427   /**
428    * GstQueue2:ring-buffer-max-size
429    *
430    * The maximum size of the ring buffer in bytes. If set to 0, the ring
431    * buffer is disabled. Default 0.
432    */
433   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
434       g_param_spec_uint64 ("ring-buffer-max-size",
435           "Max. ring buffer size (bytes)",
436           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
437           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
438           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439
440   /**
441    * GstQueue2:avg-in-rate
442    *
443    * The average input data rate.
444    */
445   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
446       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
447           "Average input data rate (bytes/s)",
448           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
449
450   /* set several parent class virtual functions */
451   gobject_class->finalize = gst_queue2_finalize;
452
453   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
454   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
455
456   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
457       "Generic",
458       "Simple data queue",
459       "Erik Walthinsen <omega@cse.ogi.edu>, "
460       "Wim Taymans <wim.taymans@gmail.com>");
461
462   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
463   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
464 }
465
466 static void
467 gst_queue2_init (GstQueue2 * queue)
468 {
469   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
470
471   gst_pad_set_chain_function (queue->sinkpad,
472       GST_DEBUG_FUNCPTR (gst_queue2_chain));
473   gst_pad_set_chain_list_function (queue->sinkpad,
474       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
475   gst_pad_set_activatemode_function (queue->sinkpad,
476       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
477   gst_pad_set_event_full_function (queue->sinkpad,
478       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
479   gst_pad_set_query_function (queue->sinkpad,
480       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
481   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
482   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
483
484   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
485
486   gst_pad_set_activatemode_function (queue->srcpad,
487       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
488   gst_pad_set_getrange_function (queue->srcpad,
489       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
490   gst_pad_set_event_function (queue->srcpad,
491       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
492   gst_pad_set_query_function (queue->srcpad,
493       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
494   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
495   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
496
497   /* levels */
498   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
499   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
500   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
501   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
502   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
503   queue->use_buffering = DEFAULT_USE_BUFFERING;
504   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
505   queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
506   queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
507
508   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
509   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
510
511   queue->sinktime = GST_CLOCK_TIME_NONE;
512   queue->srctime = GST_CLOCK_TIME_NONE;
513   queue->sink_tainted = TRUE;
514   queue->src_tainted = TRUE;
515
516   queue->srcresult = GST_FLOW_FLUSHING;
517   queue->sinkresult = GST_FLOW_FLUSHING;
518   queue->is_eos = FALSE;
519   queue->in_timer = g_timer_new ();
520   queue->out_timer = g_timer_new ();
521
522   g_mutex_init (&queue->qlock);
523   queue->waiting_add = FALSE;
524   g_cond_init (&queue->item_add);
525   queue->waiting_del = FALSE;
526   g_cond_init (&queue->item_del);
527   queue->queue = gst_queue_array_new_for_struct (sizeof (GstQueue2Item), 32);
528
529   g_cond_init (&queue->query_handled);
530   queue->last_query = FALSE;
531
532   g_mutex_init (&queue->buffering_post_lock);
533   queue->buffering_percent = 100;
534
535   /* tempfile related */
536   queue->temp_template = NULL;
537   queue->temp_location = NULL;
538   queue->temp_remove = DEFAULT_TEMP_REMOVE;
539
540   queue->ring_buffer = NULL;
541   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
542
543   GST_DEBUG_OBJECT (queue,
544       "initialized queue's not_empty & not_full conditions");
545 }
546
547 /* called only once, as opposed to dispose */
548 static void
549 gst_queue2_finalize (GObject * object)
550 {
551   GstQueue2 *queue = GST_QUEUE2 (object);
552   GstQueue2Item *qitem;
553
554   GST_DEBUG_OBJECT (queue, "finalizing queue");
555
556   while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
557     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
558       gst_mini_object_unref (qitem->item);
559   }
560   gst_queue_array_free (queue->queue);
561
562   queue->last_query = FALSE;
563   g_mutex_clear (&queue->qlock);
564   g_mutex_clear (&queue->buffering_post_lock);
565   g_cond_clear (&queue->item_add);
566   g_cond_clear (&queue->item_del);
567   g_cond_clear (&queue->query_handled);
568   g_timer_destroy (queue->in_timer);
569   g_timer_destroy (queue->out_timer);
570
571   /* temp_file path cleanup  */
572   g_free (queue->temp_template);
573   g_free (queue->temp_location);
574
575   G_OBJECT_CLASS (parent_class)->finalize (object);
576 }
577
578 static void
579 debug_ranges (GstQueue2 * queue)
580 {
581   GstQueue2Range *walk;
582
583   for (walk = queue->ranges; walk; walk = walk->next) {
584     GST_DEBUG_OBJECT (queue,
585         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
586         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
587         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
588         walk->rb_writing_pos, walk->reading_pos,
589         walk == queue->current ? "**y**" : "  n  ");
590   }
591 }
592
593 /* clear all the downloaded ranges */
594 static void
595 clean_ranges (GstQueue2 * queue)
596 {
597   GST_DEBUG_OBJECT (queue, "clean queue ranges");
598
599   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
600   queue->ranges = NULL;
601   queue->current = NULL;
602 }
603
604 /* find a range that contains @offset or NULL when nothing does */
605 static GstQueue2Range *
606 find_range (GstQueue2 * queue, guint64 offset)
607 {
608   GstQueue2Range *range = NULL;
609   GstQueue2Range *walk;
610
611   /* first do a quick check for the current range */
612   for (walk = queue->ranges; walk; walk = walk->next) {
613     if (offset >= walk->offset && offset <= walk->writing_pos) {
614       /* we can reuse an existing range */
615       range = walk;
616       break;
617     }
618   }
619   if (range) {
620     GST_DEBUG_OBJECT (queue,
621         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
622         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
623   } else {
624     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
625   }
626   return range;
627 }
628
629 static void
630 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
631 {
632   guint64 max_reading_pos, writing_pos;
633
634   writing_pos = range->writing_pos;
635   max_reading_pos = range->max_reading_pos;
636
637   if (writing_pos > max_reading_pos)
638     queue->cur_level.bytes = writing_pos - max_reading_pos;
639   else
640     queue->cur_level.bytes = 0;
641 }
642
643 /* make a new range for @offset or reuse an existing range */
644 static GstQueue2Range *
645 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
646 {
647   GstQueue2Range *range, *prev, *next;
648
649   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
650
651   if ((range = find_range (queue, offset))) {
652     GST_DEBUG_OBJECT (queue,
653         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
654         range->writing_pos);
655     if (update_existing && range->writing_pos != offset) {
656       GST_DEBUG_OBJECT (queue, "updating range writing position to "
657           "%" G_GUINT64_FORMAT, offset);
658       range->writing_pos = offset;
659     }
660   } else {
661     GST_DEBUG_OBJECT (queue,
662         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
663
664     range = g_slice_new0 (GstQueue2Range);
665     range->offset = offset;
666     /* we want to write to the next location in the ring buffer */
667     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
668     range->writing_pos = offset;
669     range->rb_writing_pos = range->rb_offset;
670     range->reading_pos = offset;
671     range->max_reading_pos = offset;
672
673     /* insert sorted */
674     prev = NULL;
675     next = queue->ranges;
676     while (next) {
677       if (next->offset > offset) {
678         /* insert before next */
679         GST_DEBUG_OBJECT (queue,
680             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
681             next->offset);
682         break;
683       }
684       /* try next */
685       prev = next;
686       next = next->next;
687     }
688     range->next = next;
689     if (prev)
690       prev->next = range;
691     else
692       queue->ranges = range;
693   }
694   debug_ranges (queue);
695
696   /* update the stats for this range */
697   update_cur_level (queue, range);
698
699   return range;
700 }
701
702
703 /* clear and init the download ranges for offset 0 */
704 static void
705 init_ranges (GstQueue2 * queue)
706 {
707   GST_DEBUG_OBJECT (queue, "init queue ranges");
708
709   /* get rid of all the current ranges */
710   clean_ranges (queue);
711   /* make a range for offset 0 */
712   queue->current = add_range (queue, 0, TRUE);
713 }
714
715 /* calculate the diff between running time on the sink and src of the queue.
716  * This is the total amount of time in the queue. */
717 static void
718 update_time_level (GstQueue2 * queue)
719 {
720   if (queue->sink_tainted) {
721     queue->sinktime =
722         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
723         queue->sink_segment.position);
724     queue->sink_tainted = FALSE;
725   }
726
727   if (queue->src_tainted) {
728     queue->srctime =
729         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
730         queue->src_segment.position);
731     queue->src_tainted = FALSE;
732   }
733
734   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
735       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
736
737   if (queue->sinktime != GST_CLOCK_TIME_NONE
738       && queue->srctime != GST_CLOCK_TIME_NONE
739       && queue->sinktime >= queue->srctime)
740     queue->cur_level.time = queue->sinktime - queue->srctime;
741   else
742     queue->cur_level.time = 0;
743 }
744
745 /* take a SEGMENT event and apply the values to segment, updating the time
746  * level of queue. */
747 static void
748 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
749     gboolean is_sink)
750 {
751   gst_event_copy_segment (event, segment);
752
753   if (segment->format == GST_FORMAT_BYTES) {
754     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
755       /* start is where we'll be getting from and as such writing next */
756       queue->current = add_range (queue, segment->start, TRUE);
757     }
758   }
759
760   /* now configure the values, we use these to track timestamps on the
761    * sinkpad. */
762   if (segment->format != GST_FORMAT_TIME) {
763     /* non-time format, pretend the current time segment is closed with a
764      * 0 start and unknown stop time. */
765     segment->format = GST_FORMAT_TIME;
766     segment->start = 0;
767     segment->stop = -1;
768     segment->time = 0;
769   }
770
771   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
772
773   if (is_sink)
774     queue->sink_tainted = TRUE;
775   else
776     queue->src_tainted = TRUE;
777
778   /* segment can update the time level of the queue */
779   update_time_level (queue);
780 }
781
782 static void
783 apply_gap (GstQueue2 * queue, GstEvent * event,
784     GstSegment * segment, gboolean is_sink)
785 {
786   GstClockTime timestamp;
787   GstClockTime duration;
788
789   gst_event_parse_gap (event, &timestamp, &duration);
790
791   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
792
793     if (GST_CLOCK_TIME_IS_VALID (duration)) {
794       timestamp += duration;
795     }
796
797     segment->position = timestamp;
798
799     if (is_sink)
800       queue->sink_tainted = TRUE;
801     else
802       queue->src_tainted = TRUE;
803
804     /* calc diff with other end */
805     update_time_level (queue);
806   }
807 }
808
809 /* take a buffer and update segment, updating the time level of the queue. */
810 static void
811 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
812     guint64 size, gboolean is_sink)
813 {
814   GstClockTime duration, timestamp;
815
816   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
817   duration = GST_BUFFER_DURATION (buffer);
818
819   /* If we have no duration, pick one from the bitrate if we can */
820   if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
821     guint bitrate =
822         is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
823     if (bitrate)
824       duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
825   }
826
827   /* if no timestamp is set, assume it's continuous with the previous
828    * time */
829   if (timestamp == GST_CLOCK_TIME_NONE)
830     timestamp = segment->position;
831
832   /* add duration */
833   if (duration != GST_CLOCK_TIME_NONE)
834     timestamp += duration;
835
836   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
837       GST_TIME_ARGS (timestamp));
838
839   segment->position = timestamp;
840
841   if (is_sink)
842     queue->sink_tainted = TRUE;
843   else
844     queue->src_tainted = TRUE;
845
846   /* calc diff with other end */
847   update_time_level (queue);
848 }
849
850 struct BufListData
851 {
852   GstClockTime timestamp;
853   guint bitrate;
854 };
855
856 static gboolean
857 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
858 {
859   struct BufListData *bld = data;
860   GstClockTime *timestamp = &bld->timestamp;
861   GstClockTime btime;
862
863   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
864       " duration %" GST_TIME_FORMAT, idx,
865       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
866       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
867       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
868
869   btime = GST_BUFFER_DTS_OR_PTS (*buf);
870   if (GST_CLOCK_TIME_IS_VALID (btime))
871     *timestamp = btime;
872
873   if (GST_BUFFER_DURATION_IS_VALID (*buf))
874     *timestamp += GST_BUFFER_DURATION (*buf);
875   else if (bld->bitrate != 0) {
876     guint64 size = gst_buffer_get_size (*buf);
877
878     /* If we have no duration, pick one from the bitrate if we can */
879     *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
880   }
881
882
883   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
884   return TRUE;
885 }
886
887 /* take a buffer list and update segment, updating the time level of the queue */
888 static void
889 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
890     GstSegment * segment, gboolean is_sink)
891 {
892   struct BufListData bld;
893
894   /* if no timestamp is set, assume it's continuous with the previous time */
895   bld.timestamp = segment->position;
896
897   if (queue->use_tags_bitrate) {
898     if (is_sink)
899       bld.bitrate = queue->sink_tags_bitrate;
900     else
901       bld.bitrate = queue->src_tags_bitrate;
902   } else
903     bld.bitrate = 0;
904
905   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
906
907   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
908       GST_TIME_ARGS (bld.timestamp));
909
910   segment->position = bld.timestamp;
911
912   if (is_sink)
913     queue->sink_tainted = TRUE;
914   else
915     queue->src_tainted = TRUE;
916
917   /* calc diff with other end */
918   update_time_level (queue);
919 }
920
921 static inline gint
922 normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
923     guint64 alt_max)
924 {
925   guint64 p;
926
927   if (max_level == 0)
928     return 0;
929
930   if (alt_max > 0)
931     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
932         MIN (max_level, alt_max));
933   else
934     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
935
936   return MIN (p, MAX_BUFFERING_LEVEL);
937 }
938
939 static gboolean
940 get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
941     gint * buffering_level)
942 {
943   gint buflevel, buflevel2;
944
945   if (queue->high_watermark <= 0) {
946     if (buffering_level)
947       *buffering_level = MAX_BUFFERING_LEVEL;
948     if (is_buffering)
949       *is_buffering = FALSE;
950     return FALSE;
951   }
952 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
953     normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
954
955   if (queue->is_eos || queue->srcresult == GST_FLOW_NOT_LINKED) {
956     /* on EOS and NOT_LINKED we are always 100% full, we set the var
957      * here so that we can reuse the logic below to stop buffering */
958     buflevel = MAX_BUFFERING_LEVEL;
959     GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
960   } else {
961     GST_LOG_OBJECT (queue,
962         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
963         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
964         queue->cur_level.buffers);
965
966     /* figure out the buffering level we are filled, we take the max of all formats. */
967     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
968       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
969     } else {
970       guint64 rb_size = queue->ring_buffer_max_size;
971       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
972     }
973
974     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
975     buflevel = MAX (buflevel, buflevel2);
976
977     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
978     buflevel = MAX (buflevel, buflevel2);
979
980     /* also apply the rate estimate when we need to */
981     if (queue->use_rate_estimate) {
982       buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
983       buflevel = MAX (buflevel, buflevel2);
984     }
985
986     /* Don't get to 0% unless we're really empty */
987     if (queue->cur_level.bytes > 0)
988       buflevel = MAX (1, buflevel);
989   }
990 #undef GET_BUFFER_LEVEL_FOR_QUANTITY
991
992   if (is_buffering)
993     *is_buffering = queue->is_buffering;
994
995   if (buffering_level)
996     *buffering_level = buflevel;
997
998   GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
999       buflevel);
1000
1001   return TRUE;
1002 }
1003
1004 static gint
1005 convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
1006 {
1007   int percent;
1008
1009   /* scale so that if buffering_level equals the high watermark,
1010    * the percentage is 100% */
1011   percent = buffering_level * 100 / queue->high_watermark;
1012   /* clip */
1013   if (percent > 100)
1014     percent = 100;
1015
1016   return percent;
1017 }
1018
1019 static void
1020 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
1021     gint * avg_in, gint * avg_out, gint64 * buffering_left)
1022 {
1023   if (mode) {
1024     if (!QUEUE_IS_USING_QUEUE (queue)) {
1025       if (QUEUE_IS_USING_RING_BUFFER (queue))
1026         *mode = GST_BUFFERING_TIMESHIFT;
1027       else
1028         *mode = GST_BUFFERING_DOWNLOAD;
1029     } else {
1030       *mode = GST_BUFFERING_STREAM;
1031     }
1032   }
1033
1034   if (avg_in)
1035     *avg_in = queue->byte_in_rate;
1036   if (avg_out)
1037     *avg_out = queue->byte_out_rate;
1038
1039   if (buffering_left) {
1040     *buffering_left = (percent == 100 ? 0 : -1);
1041
1042     if (queue->use_rate_estimate) {
1043       guint64 max, cur;
1044
1045       max = queue->max_level.rate_time;
1046       cur = queue->cur_level.rate_time;
1047
1048       if (percent != 100 && max > cur)
1049         *buffering_left = (max - cur) / 1000000;
1050     }
1051   }
1052 }
1053
1054 /* Called with the lock taken */
1055 static GstMessage *
1056 gst_queue2_get_buffering_message (GstQueue2 * queue)
1057 {
1058   GstMessage *msg = NULL;
1059
1060   if (queue->percent_changed) {
1061     gint percent = queue->buffering_percent;
1062
1063     queue->percent_changed = FALSE;
1064
1065     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1066     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1067
1068     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1069         queue->avg_out, queue->buffering_left);
1070   }
1071
1072   return msg;
1073 }
1074
1075 static void
1076 gst_queue2_post_buffering (GstQueue2 * queue)
1077 {
1078   GstMessage *msg = NULL;
1079
1080   g_mutex_lock (&queue->buffering_post_lock);
1081   GST_QUEUE2_MUTEX_LOCK (queue);
1082   msg = gst_queue2_get_buffering_message (queue);
1083   GST_QUEUE2_MUTEX_UNLOCK (queue);
1084
1085   if (msg != NULL)
1086     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1087
1088   g_mutex_unlock (&queue->buffering_post_lock);
1089 }
1090
1091 static void
1092 update_buffering (GstQueue2 * queue)
1093 {
1094   gint buffering_level, percent;
1095
1096   /* Ensure the variables used to calculate buffering state are up-to-date. */
1097   if (queue->current)
1098     update_cur_level (queue, queue->current);
1099   update_in_rates (queue, FALSE);
1100
1101   if (!get_buffering_level (queue, NULL, &buffering_level))
1102     return;
1103
1104   percent = convert_to_buffering_percent (queue, buffering_level);
1105
1106   if (queue->is_buffering) {
1107     /* if we were buffering see if we reached the high watermark */
1108     if (percent >= 100)
1109       queue->is_buffering = FALSE;
1110
1111     SET_PERCENT (queue, percent);
1112   } else {
1113     /* we were not buffering, check if we need to start buffering if we drop
1114      * below the low threshold */
1115     if (buffering_level < queue->low_watermark) {
1116       queue->is_buffering = TRUE;
1117       SET_PERCENT (queue, percent);
1118     }
1119   }
1120 }
1121
1122 static void
1123 reset_rate_timer (GstQueue2 * queue)
1124 {
1125   queue->bytes_in = 0;
1126   queue->bytes_out = 0;
1127   queue->byte_in_rate = 0.0;
1128   queue->byte_in_period = 0;
1129   queue->byte_out_rate = 0.0;
1130   queue->last_update_in_rates_elapsed = 0.0;
1131   queue->last_in_elapsed = 0.0;
1132   queue->last_out_elapsed = 0.0;
1133   queue->in_timer_started = FALSE;
1134   queue->out_timer_started = FALSE;
1135 }
1136
1137 /* the interval in seconds to recalculate the rate */
1138 #define RATE_INTERVAL    0.2
1139 /* Tuning for rate estimation. We use a large window for the input rate because
1140  * it should be stable when connected to a network. The output rate is less
1141  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1142  * therefore adapt more quickly.
1143  * However, initial input rate may be subject to a burst, and should therefore
1144  * initially also adapt more quickly to changes, and only later on give higher
1145  * weight to previous values. */
1146 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1147 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1148
1149 static void
1150 update_in_rates (GstQueue2 * queue, gboolean force)
1151 {
1152   gdouble elapsed, period;
1153   gdouble byte_in_rate;
1154
1155   if (!queue->in_timer_started) {
1156     queue->in_timer_started = TRUE;
1157     g_timer_start (queue->in_timer);
1158     return;
1159   }
1160
1161   queue->last_update_in_rates_elapsed = elapsed =
1162       g_timer_elapsed (queue->in_timer, NULL);
1163
1164   /* recalc after each interval. */
1165   if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1166     period = elapsed - queue->last_in_elapsed;
1167
1168     GST_DEBUG_OBJECT (queue,
1169         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1170         period, queue->bytes_in, queue->byte_in_period);
1171
1172     byte_in_rate = queue->bytes_in / period;
1173
1174     if (queue->byte_in_rate == 0.0)
1175       queue->byte_in_rate = byte_in_rate;
1176     else
1177       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1178           (double) queue->byte_in_period, period);
1179
1180     /* another data point, cap at 16 for long time running average */
1181     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1182       queue->byte_in_period += period;
1183
1184     /* reset the values to calculate rate over the next interval */
1185     queue->last_in_elapsed = elapsed;
1186     queue->bytes_in = 0;
1187   }
1188
1189   if (queue->byte_in_rate > 0.0) {
1190     queue->cur_level.rate_time =
1191         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1192   }
1193   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1194       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1195 }
1196
1197 static void
1198 update_out_rates (GstQueue2 * queue)
1199 {
1200   gdouble elapsed, period;
1201   gdouble byte_out_rate;
1202
1203   if (!queue->out_timer_started) {
1204     queue->out_timer_started = TRUE;
1205     g_timer_start (queue->out_timer);
1206     return;
1207   }
1208
1209   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1210
1211   /* recalc after each interval. */
1212   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1213     period = elapsed - queue->last_out_elapsed;
1214
1215     GST_DEBUG_OBJECT (queue,
1216         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1217
1218     byte_out_rate = queue->bytes_out / period;
1219
1220     if (queue->byte_out_rate == 0.0)
1221       queue->byte_out_rate = byte_out_rate;
1222     else
1223       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1224
1225     /* reset the values to calculate rate over the next interval */
1226     queue->last_out_elapsed = elapsed;
1227     queue->bytes_out = 0;
1228   }
1229   if (queue->byte_in_rate > 0.0) {
1230     queue->cur_level.rate_time =
1231         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1232   }
1233   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1234       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1235 }
1236
1237 static void
1238 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1239 {
1240   guint64 reading_pos, max_reading_pos;
1241
1242   reading_pos = pos;
1243   max_reading_pos = range->max_reading_pos;
1244
1245   max_reading_pos = MAX (max_reading_pos, reading_pos);
1246
1247   GST_DEBUG_OBJECT (queue,
1248       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1249       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1250   range->max_reading_pos = max_reading_pos;
1251
1252   update_cur_level (queue, range);
1253 }
1254
1255 static gboolean
1256 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1257 {
1258   GstEvent *event;
1259   gboolean res;
1260
1261   /* until we receive the FLUSH_STOP from this seek, we skip data */
1262   queue->seeking = TRUE;
1263   GST_QUEUE2_MUTEX_UNLOCK (queue);
1264
1265   debug_ranges (queue);
1266
1267   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1268
1269   event =
1270       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1271       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1272       GST_SEEK_TYPE_NONE, -1);
1273
1274   res = gst_pad_push_event (queue->sinkpad, event);
1275   GST_QUEUE2_MUTEX_LOCK (queue);
1276
1277   if (res) {
1278     /* Between us sending the seek event and re-acquiring the lock, the source
1279      * thread might already have pushed data and moved along the range's
1280      * writing_pos beyond the seek offset. In that case we don't want to set
1281      * the writing position back to the requested seek position, as it would
1282      * cause data to be written to the wrong offset in the file or ring buffer.
1283      * We still do the add_range call to switch the current range to the
1284      * requested range, or create one if one doesn't exist yet. */
1285     queue->current = add_range (queue, offset, FALSE);
1286   }
1287
1288   return res;
1289 }
1290
1291 /* get the threshold for when we decide to seek rather than wait */
1292 static guint64
1293 get_seek_threshold (GstQueue2 * queue)
1294 {
1295   guint64 threshold;
1296
1297   /* FIXME, find a good threshold based on the incoming rate. */
1298   threshold = 1024 * 512;
1299
1300   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1301     threshold = MIN (threshold,
1302         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1303   }
1304   return threshold;
1305 }
1306
1307 /* see if there is enough data in the file to read a full buffer */
1308 static gboolean
1309 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1310 {
1311   GstQueue2Range *range;
1312
1313   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1314       offset, length);
1315
1316   if ((range = find_range (queue, offset))) {
1317     if (queue->current != range) {
1318       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1319       perform_seek_to_offset (queue, range->writing_pos);
1320     }
1321
1322     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1323         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1324
1325     /* we have a range for offset */
1326     GST_DEBUG_OBJECT (queue,
1327         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1328         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1329
1330     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1331       return TRUE;
1332
1333     if (offset + length <= range->writing_pos)
1334       return TRUE;
1335     else
1336       GST_DEBUG_OBJECT (queue,
1337           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1338           (offset + length) - range->writing_pos);
1339
1340   } else {
1341     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1342         " len %u", offset, length);
1343     /* we don't have the range, see how far away we are */
1344     if (!queue->is_eos && queue->current) {
1345       guint64 threshold = get_seek_threshold (queue);
1346
1347       if (offset >= queue->current->offset && offset <=
1348           queue->current->writing_pos + threshold) {
1349         GST_INFO_OBJECT (queue,
1350             "requested data is within range, wait for data");
1351         return FALSE;
1352       }
1353     }
1354
1355     /* too far away, do a seek */
1356     perform_seek_to_offset (queue, offset);
1357   }
1358
1359   return FALSE;
1360 }
1361
1362 #ifdef HAVE_FSEEKO
1363 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1364 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1365 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1366 #else
1367 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1368 #endif
1369
1370 static GstFlowReturn
1371 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1372     guint8 * dst, gint64 * read_return)
1373 {
1374   guint8 *ring_buffer;
1375   size_t res;
1376
1377   ring_buffer = queue->ring_buffer;
1378
1379   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1380     goto seek_failed;
1381
1382   /* this should not block */
1383   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1384       length, offset);
1385   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1386     res = fread (dst, 1, length, queue->temp_file);
1387   } else {
1388     memcpy (dst, ring_buffer + offset, length);
1389     res = length;
1390   }
1391
1392   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1393
1394   if (G_UNLIKELY (res < length)) {
1395     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1396       goto could_not_read;
1397     /* check for errors or EOF */
1398     if (ferror (queue->temp_file))
1399       goto could_not_read;
1400     if (feof (queue->temp_file) && length > 0)
1401       goto eos;
1402   }
1403
1404   *read_return = res;
1405
1406   return GST_FLOW_OK;
1407
1408 seek_failed:
1409   {
1410     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1411     return GST_FLOW_ERROR;
1412   }
1413 could_not_read:
1414   {
1415     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1416     return GST_FLOW_ERROR;
1417   }
1418 eos:
1419   {
1420     GST_DEBUG ("non-regular file hits EOS");
1421     return GST_FLOW_EOS;
1422   }
1423 }
1424
1425 static GstFlowReturn
1426 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1427     GstBuffer ** buffer)
1428 {
1429   GstBuffer *buf;
1430   GstMapInfo info;
1431   guint8 *data;
1432   guint64 file_offset;
1433   guint block_length, remaining, read_length;
1434   guint64 rb_size;
1435   guint64 max_size;
1436   guint64 rpos;
1437   GstFlowReturn ret = GST_FLOW_OK;
1438
1439   /* allocate the output buffer of the requested size */
1440   if (*buffer == NULL)
1441     buf = gst_buffer_new_allocate (NULL, length, NULL);
1442   else
1443     buf = *buffer;
1444
1445   if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1446     goto buffer_write_fail;
1447   data = info.data;
1448
1449   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1450       offset);
1451
1452   rpos = offset;
1453   rb_size = queue->ring_buffer_max_size;
1454   max_size = QUEUE_MAX_BYTES (queue);
1455
1456   remaining = length;
1457   while (remaining > 0) {
1458     /* configure how much/whether to read */
1459     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1460       read_length = 0;
1461
1462       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1463         guint64 level;
1464
1465         /* calculate how far away the offset is */
1466         if (queue->current->writing_pos > rpos)
1467           level = queue->current->writing_pos - rpos;
1468         else
1469           level = 0;
1470
1471         GST_DEBUG_OBJECT (queue,
1472             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1473             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1474             rpos, queue->current->writing_pos, level, max_size);
1475
1476         if (level >= max_size) {
1477           /* we don't have the data but if we have a ring buffer that is full, we
1478            * need to read */
1479           GST_DEBUG_OBJECT (queue,
1480               "ring buffer full, reading QUEUE_MAX_BYTES %"
1481               G_GUINT64_FORMAT " bytes", max_size);
1482           read_length = max_size;
1483         } else if (queue->is_eos) {
1484           /* won't get any more data so read any data we have */
1485           if (level) {
1486             GST_DEBUG_OBJECT (queue,
1487                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1488                 level);
1489             read_length = level;
1490             remaining = level;
1491             length = level;
1492           } else
1493             goto hit_eos;
1494         }
1495       }
1496
1497       if (read_length == 0) {
1498         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1499           GST_DEBUG_OBJECT (queue,
1500               "update current position [%" G_GUINT64_FORMAT "-%"
1501               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1502           update_cur_pos (queue, queue->current, rpos);
1503           GST_QUEUE2_SIGNAL_DEL (queue);
1504         }
1505
1506         if (queue->use_buffering)
1507           update_buffering (queue);
1508
1509         GST_DEBUG_OBJECT (queue, "waiting for add");
1510         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1511         continue;
1512       }
1513     } else {
1514       /* we have the requested data so read it */
1515       read_length = remaining;
1516     }
1517
1518     /* set range reading_pos to actual reading position for this read */
1519     queue->current->reading_pos = rpos;
1520
1521     /* configure how much and from where to read */
1522     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1523       file_offset =
1524           (queue->current->rb_offset + (rpos -
1525               queue->current->offset)) % rb_size;
1526       if (file_offset + read_length > rb_size) {
1527         block_length = rb_size - file_offset;
1528       } else {
1529         block_length = read_length;
1530       }
1531     } else {
1532       file_offset = rpos;
1533       block_length = read_length;
1534     }
1535
1536     /* while we still have data to read, we loop */
1537     while (read_length > 0) {
1538       gint64 read_return;
1539
1540       ret =
1541           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1542           data, &read_return);
1543       if (ret != GST_FLOW_OK)
1544         goto read_error;
1545
1546       file_offset += read_return;
1547       if (QUEUE_IS_USING_RING_BUFFER (queue))
1548         file_offset %= rb_size;
1549
1550       data += read_return;
1551       read_length -= read_return;
1552       block_length = read_length;
1553       remaining -= read_return;
1554
1555       rpos = (queue->current->reading_pos += read_return);
1556       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1557     }
1558     GST_QUEUE2_SIGNAL_DEL (queue);
1559     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1560   }
1561
1562   gst_buffer_unmap (buf, &info);
1563   gst_buffer_resize (buf, 0, length);
1564
1565   GST_BUFFER_OFFSET (buf) = offset;
1566   GST_BUFFER_OFFSET_END (buf) = offset + length;
1567
1568   *buffer = buf;
1569
1570   return ret;
1571
1572   /* ERRORS */
1573 hit_eos:
1574   {
1575     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1576     gst_buffer_unmap (buf, &info);
1577     if (*buffer == NULL)
1578       gst_buffer_unref (buf);
1579     return GST_FLOW_EOS;
1580   }
1581 out_flushing:
1582   {
1583     GST_DEBUG_OBJECT (queue, "we are flushing");
1584     gst_buffer_unmap (buf, &info);
1585     if (*buffer == NULL)
1586       gst_buffer_unref (buf);
1587     return GST_FLOW_FLUSHING;
1588   }
1589 read_error:
1590   {
1591     GST_DEBUG_OBJECT (queue, "we have a read error");
1592     gst_buffer_unmap (buf, &info);
1593     if (*buffer == NULL)
1594       gst_buffer_unref (buf);
1595     return ret;
1596   }
1597 buffer_write_fail:
1598   {
1599     GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1600         ("Can't write to buffer"));
1601     if (*buffer == NULL)
1602       gst_buffer_unref (buf);
1603     return GST_FLOW_ERROR;
1604   }
1605 }
1606
1607 /* should be called with QUEUE_LOCK */
1608 static GstMiniObject *
1609 gst_queue2_read_item_from_file (GstQueue2 * queue)
1610 {
1611   GstMiniObject *item;
1612
1613   if (queue->stream_start_event != NULL) {
1614     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1615     queue->stream_start_event = NULL;
1616   } else if (queue->starting_segment != NULL) {
1617     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1618     queue->starting_segment = NULL;
1619   } else {
1620     GstFlowReturn ret;
1621     GstBuffer *buffer = NULL;
1622     guint64 reading_pos;
1623
1624     reading_pos = queue->current->reading_pos;
1625
1626     ret =
1627         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1628         &buffer);
1629
1630     switch (ret) {
1631       case GST_FLOW_OK:
1632         item = GST_MINI_OBJECT_CAST (buffer);
1633         break;
1634       case GST_FLOW_EOS:
1635         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1636         break;
1637       default:
1638         item = NULL;
1639         break;
1640     }
1641   }
1642   return item;
1643 }
1644
1645 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1646  * the temp filename. */
1647 static gboolean
1648 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1649 {
1650   gint fd = -1;
1651   gchar *name = NULL;
1652
1653   if (queue->temp_file)
1654     goto already_opened;
1655
1656   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1657
1658   /* If temp_template was set, allocate a filename and open that file */
1659
1660   /* nothing to do */
1661   if (queue->temp_template == NULL)
1662     goto no_directory;
1663
1664   /* make copy of the template, we don't want to change this */
1665   name = g_strdup (queue->temp_template);
1666
1667 #ifdef __BIONIC__
1668   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1669 #else
1670   fd = g_mkstemp (name);
1671 #endif
1672
1673   if (fd == -1)
1674     goto mkstemp_failed;
1675
1676   /* open the file for update/writing */
1677   queue->temp_file = fdopen (fd, "wb+");
1678   /* error creating file */
1679   if (queue->temp_file == NULL)
1680     goto open_failed;
1681
1682   g_free (queue->temp_location);
1683   queue->temp_location = name;
1684
1685   GST_QUEUE2_MUTEX_UNLOCK (queue);
1686
1687   /* we can't emit the notify with the lock */
1688   g_object_notify (G_OBJECT (queue), "temp-location");
1689
1690   GST_QUEUE2_MUTEX_LOCK (queue);
1691
1692   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1693
1694   return TRUE;
1695
1696   /* ERRORS */
1697 already_opened:
1698   {
1699     GST_DEBUG_OBJECT (queue, "temp file was already open");
1700     return TRUE;
1701   }
1702 no_directory:
1703   {
1704     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1705         (_("No Temp directory specified.")), (NULL));
1706     return FALSE;
1707   }
1708 mkstemp_failed:
1709   {
1710     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1711         (_("Could not create temp file \"%s\"."), queue->temp_template),
1712         GST_ERROR_SYSTEM);
1713     g_free (name);
1714     return FALSE;
1715   }
1716 open_failed:
1717   {
1718     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1719         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1720     g_free (name);
1721     if (fd != -1)
1722       close (fd);
1723     return FALSE;
1724   }
1725 }
1726
1727 static void
1728 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1729 {
1730   /* nothing to do */
1731   if (queue->temp_file == NULL)
1732     return;
1733
1734   GST_DEBUG_OBJECT (queue, "closing temp file");
1735
1736   fflush (queue->temp_file);
1737   fclose (queue->temp_file);
1738
1739   if (queue->temp_remove) {
1740     if (remove (queue->temp_location) < 0) {
1741       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1742           queue->temp_location, g_strerror (errno));
1743     }
1744   }
1745
1746   queue->temp_file = NULL;
1747   clean_ranges (queue);
1748 }
1749
1750 static void
1751 gst_queue2_flush_temp_file (GstQueue2 * queue)
1752 {
1753   if (queue->temp_file == NULL)
1754     return;
1755
1756   GST_DEBUG_OBJECT (queue, "flushing temp file");
1757
1758   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1759 }
1760
1761 static void
1762 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1763 {
1764   if (!QUEUE_IS_USING_QUEUE (queue)) {
1765     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1766       gst_queue2_flush_temp_file (queue);
1767     init_ranges (queue);
1768   } else {
1769     GstQueue2Item *qitem;
1770
1771     while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
1772       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1773           && GST_EVENT_IS_STICKY (qitem->item)
1774           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1775           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1776         gst_pad_store_sticky_event (queue->srcpad,
1777             GST_EVENT_CAST (qitem->item));
1778       }
1779
1780       /* Then lose another reference because we are supposed to destroy that
1781          data when flushing */
1782       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1783         gst_mini_object_unref (qitem->item);
1784     }
1785   }
1786   queue->last_query = FALSE;
1787   g_cond_signal (&queue->query_handled);
1788   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1789   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1790   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1791   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1792   queue->sink_tainted = queue->src_tainted = TRUE;
1793   if (queue->starting_segment != NULL)
1794     gst_event_unref (queue->starting_segment);
1795   queue->starting_segment = NULL;
1796   queue->segment_event_received = FALSE;
1797   gst_event_replace (&queue->stream_start_event, NULL);
1798
1799   /* we deleted a lot of something */
1800   GST_QUEUE2_SIGNAL_DEL (queue);
1801 }
1802
1803 static gboolean
1804 gst_queue2_wait_free_space (GstQueue2 * queue)
1805 {
1806   /* We make space available if we're "full" according to whatever
1807    * the user defined as "full". */
1808   if (gst_queue2_is_filled (queue)) {
1809     gboolean started;
1810
1811     /* pause the timer while we wait. The fact that we are waiting does not mean
1812      * the byterate on the input pad is lower */
1813     if ((started = queue->in_timer_started))
1814       g_timer_stop (queue->in_timer);
1815
1816     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1817         "queue is full, waiting for free space");
1818     do {
1819       /* Wait for space to be available, we could be unlocked because of a flush. */
1820       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1821     }
1822     while (gst_queue2_is_filled (queue));
1823
1824     /* and continue if we were running before */
1825     if (started)
1826       g_timer_continue (queue->in_timer);
1827   }
1828   return TRUE;
1829
1830   /* ERRORS */
1831 out_flushing:
1832   {
1833     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1834     return FALSE;
1835   }
1836 }
1837
1838 static gboolean
1839 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1840 {
1841   GstMapInfo info;
1842   guint8 *data, *ring_buffer;
1843   guint size, rb_size;
1844   guint64 writing_pos, new_writing_pos;
1845   GstQueue2Range *range, *prev, *next;
1846   gboolean do_seek = FALSE;
1847
1848   if (QUEUE_IS_USING_RING_BUFFER (queue))
1849     writing_pos = queue->current->rb_writing_pos;
1850   else
1851     writing_pos = queue->current->writing_pos;
1852   ring_buffer = queue->ring_buffer;
1853   rb_size = queue->ring_buffer_max_size;
1854
1855   if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
1856     goto buffer_read_error;
1857
1858   size = info.size;
1859   data = info.data;
1860
1861   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1862       writing_pos);
1863
1864   /* sanity check */
1865   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1866       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1867     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1868         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1869         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1870   }
1871
1872   while (size > 0) {
1873     guint to_write;
1874
1875     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1876       gint64 space;
1877
1878       /* calculate the space in the ring buffer not used by data from
1879        * the current range */
1880       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1881         /* wait until there is some free space */
1882         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1883       }
1884       /* get the amount of space we have */
1885       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1886
1887       /* calculate if we need to split or if we can write the entire
1888        * buffer now */
1889       to_write = MIN (size, space);
1890
1891       /* the writing position in the ring buffer after writing (part
1892        * or all of) the buffer */
1893       new_writing_pos = (writing_pos + to_write) % rb_size;
1894
1895       prev = NULL;
1896       range = queue->ranges;
1897
1898       /* if we need to overwrite data in the ring buffer, we need to
1899        * update the ranges
1900        *
1901        * warning: this code is complicated and includes some
1902        * simplifications - pen, paper and diagrams for the cases
1903        * recommended! */
1904       while (range) {
1905         guint64 range_data_start, range_data_end;
1906         GstQueue2Range *range_to_destroy = NULL;
1907
1908         if (range == queue->current)
1909           goto next_range;
1910
1911         range_data_start = range->rb_offset;
1912         range_data_end = range->rb_writing_pos;
1913
1914         /* handle the special case where the range has no data in it */
1915         if (range->writing_pos == range->offset) {
1916           if (range != queue->current) {
1917             GST_DEBUG_OBJECT (queue,
1918                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1919                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1920             /* remove range */
1921             range_to_destroy = range;
1922             if (prev)
1923               prev->next = range->next;
1924           }
1925           goto next_range;
1926         }
1927
1928         if (range_data_end > range_data_start) {
1929           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1930             goto next_range;
1931
1932           if (new_writing_pos > range_data_start) {
1933             if (new_writing_pos >= range_data_end) {
1934               GST_DEBUG_OBJECT (queue,
1935                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1936                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1937               /* remove range */
1938               range_to_destroy = range;
1939               if (prev)
1940                 prev->next = range->next;
1941             } else {
1942               GST_DEBUG_OBJECT (queue,
1943                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1944                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1945                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1946                   range->offset + new_writing_pos - range_data_start,
1947                   new_writing_pos);
1948               range->offset += (new_writing_pos - range_data_start);
1949               range->rb_offset = new_writing_pos;
1950             }
1951           }
1952         } else {
1953           guint64 new_wpos_virt = writing_pos + to_write;
1954
1955           if (new_wpos_virt <= range_data_start)
1956             goto next_range;
1957
1958           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1959             GST_DEBUG_OBJECT (queue,
1960                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1961                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1962             /* remove range */
1963             range_to_destroy = range;
1964             if (prev)
1965               prev->next = range->next;
1966           } else {
1967             GST_DEBUG_OBJECT (queue,
1968                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1969                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1970                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1971                 range->offset + new_writing_pos - range_data_start,
1972                 new_writing_pos);
1973             range->offset += (new_wpos_virt - range_data_start);
1974             range->rb_offset = new_writing_pos;
1975           }
1976         }
1977
1978       next_range:
1979         if (!range_to_destroy)
1980           prev = range;
1981
1982         range = range->next;
1983         if (range_to_destroy) {
1984           if (range_to_destroy == queue->ranges)
1985             queue->ranges = range;
1986           g_slice_free (GstQueue2Range, range_to_destroy);
1987           range_to_destroy = NULL;
1988         }
1989       }
1990     } else {
1991       to_write = size;
1992       new_writing_pos = writing_pos + to_write;
1993     }
1994
1995     if (QUEUE_IS_USING_TEMP_FILE (queue)
1996         && FSEEK_FILE (queue->temp_file, writing_pos))
1997       goto seek_failed;
1998
1999     if (new_writing_pos > writing_pos) {
2000       GST_INFO_OBJECT (queue,
2001           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
2002           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
2003           queue->current->writing_pos, queue->current->rb_writing_pos);
2004       /* either not using ring buffer or no wrapping, just write */
2005       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2006         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
2007           goto handle_error;
2008       } else {
2009         memcpy (ring_buffer + writing_pos, data, to_write);
2010       }
2011
2012       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
2013         /* try to merge with next range */
2014         while ((next = queue->current->next)) {
2015           GST_INFO_OBJECT (queue,
2016               "checking merge with next range %" G_GUINT64_FORMAT " < %"
2017               G_GUINT64_FORMAT, new_writing_pos, next->offset);
2018           if (new_writing_pos < next->offset)
2019             break;
2020
2021           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
2022               next->writing_pos);
2023
2024           /* remove the group */
2025           queue->current->next = next->next;
2026
2027           /* We use the threshold to decide if we want to do a seek or simply
2028            * read the data again. If there is not so much data in the range we
2029            * prefer to avoid to seek and read it again. */
2030           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
2031             /* the new range had more data than the threshold, it's worth keeping
2032              * it and doing a seek. */
2033             new_writing_pos = next->writing_pos;
2034             do_seek = TRUE;
2035           }
2036           g_slice_free (GstQueue2Range, next);
2037         }
2038         goto update_and_signal;
2039       }
2040     } else {
2041       /* wrapping */
2042       guint block_one, block_two;
2043
2044       block_one = rb_size - writing_pos;
2045       block_two = to_write - block_one;
2046
2047       if (block_one > 0) {
2048         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2049         /* write data to end of ring buffer */
2050         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2051           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2052             goto handle_error;
2053         } else {
2054           memcpy (ring_buffer + writing_pos, data, block_one);
2055         }
2056       }
2057
2058       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2059         goto seek_failed;
2060
2061       if (block_two > 0) {
2062         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2063         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2064           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2065             goto handle_error;
2066         } else {
2067           memcpy (ring_buffer, data + block_one, block_two);
2068         }
2069       }
2070     }
2071
2072   update_and_signal:
2073     /* update the writing positions */
2074     size -= to_write;
2075     GST_INFO_OBJECT (queue,
2076         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2077         to_write, writing_pos, size);
2078
2079     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2080       data += to_write;
2081       queue->current->writing_pos += to_write;
2082       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2083     } else {
2084       queue->current->writing_pos = writing_pos = new_writing_pos;
2085     }
2086     if (do_seek)
2087       perform_seek_to_offset (queue, new_writing_pos);
2088
2089     update_cur_level (queue, queue->current);
2090
2091     /* update the buffering status */
2092     if (queue->use_buffering) {
2093       GstMessage *msg;
2094       update_buffering (queue);
2095       msg = gst_queue2_get_buffering_message (queue);
2096       if (msg) {
2097         GST_QUEUE2_MUTEX_UNLOCK (queue);
2098         g_mutex_lock (&queue->buffering_post_lock);
2099         gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2100         g_mutex_unlock (&queue->buffering_post_lock);
2101         GST_QUEUE2_MUTEX_LOCK (queue);
2102       }
2103     }
2104
2105     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2106         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2107
2108     GST_QUEUE2_SIGNAL_ADD (queue);
2109   }
2110
2111   gst_buffer_unmap (buffer, &info);
2112
2113   return TRUE;
2114
2115   /* ERRORS */
2116 out_flushing:
2117   {
2118     GST_DEBUG_OBJECT (queue, "we are flushing");
2119     gst_buffer_unmap (buffer, &info);
2120     /* FIXME - GST_FLOW_EOS ? */
2121     return FALSE;
2122   }
2123 seek_failed:
2124   {
2125     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2126     gst_buffer_unmap (buffer, &info);
2127     return FALSE;
2128   }
2129 handle_error:
2130   {
2131     switch (errno) {
2132       case ENOSPC:{
2133         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2134         break;
2135       }
2136       default:{
2137         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2138             (_("Error while writing to download file.")),
2139             ("%s", g_strerror (errno)));
2140       }
2141     }
2142     gst_buffer_unmap (buffer, &info);
2143     return FALSE;
2144   }
2145 buffer_read_error:
2146   {
2147     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2148         ("Can't read from buffer"));
2149     return FALSE;
2150   }
2151 }
2152
2153 static gboolean
2154 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2155 {
2156   GstQueue2 *queue = q;
2157
2158   GST_TRACE_OBJECT (queue,
2159       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2160       gst_buffer_get_size (*buf));
2161
2162   if (!gst_queue2_create_write (queue, *buf)) {
2163     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2164     return FALSE;
2165   }
2166   return TRUE;
2167 }
2168
2169 /* enqueue an item an update the level stats */
2170 static void
2171 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2172     GstQueue2ItemType item_type)
2173 {
2174   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2175     GstBuffer *buffer;
2176     guint size;
2177
2178     buffer = GST_BUFFER_CAST (item);
2179     size = gst_buffer_get_size (buffer);
2180
2181     /* add buffer to the statistics */
2182     if (QUEUE_IS_USING_QUEUE (queue)) {
2183       queue->cur_level.buffers++;
2184       queue->cur_level.bytes += size;
2185     }
2186     queue->bytes_in += size;
2187
2188     /* apply new buffer to segment stats */
2189     apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2190     /* update the byterate stats */
2191     update_in_rates (queue, FALSE);
2192
2193     if (!QUEUE_IS_USING_QUEUE (queue)) {
2194       /* FIXME - check return value? */
2195       gst_queue2_create_write (queue, buffer);
2196     }
2197   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2198     GstBufferList *buffer_list;
2199     guint size;
2200
2201     buffer_list = GST_BUFFER_LIST_CAST (item);
2202
2203     size = gst_buffer_list_calculate_size (buffer_list);
2204     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2205
2206     /* add buffer to the statistics */
2207     if (QUEUE_IS_USING_QUEUE (queue)) {
2208       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2209       queue->cur_level.bytes += size;
2210     }
2211     queue->bytes_in += size;
2212
2213     /* apply new buffer to segment stats */
2214     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2215
2216     /* update the byterate stats */
2217     update_in_rates (queue, FALSE);
2218
2219     if (!QUEUE_IS_USING_QUEUE (queue)) {
2220       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2221     }
2222   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2223     GstEvent *event;
2224
2225     event = GST_EVENT_CAST (item);
2226
2227     switch (GST_EVENT_TYPE (event)) {
2228       case GST_EVENT_EOS:
2229         /* Zero the thresholds, this makes sure the queue is completely
2230          * filled and we can read all data from the queue. */
2231         GST_DEBUG_OBJECT (queue, "we have EOS");
2232         queue->is_eos = TRUE;
2233         /* Force updating the input bitrate */
2234         update_in_rates (queue, TRUE);
2235         break;
2236       case GST_EVENT_SEGMENT:
2237         apply_segment (queue, event, &queue->sink_segment, TRUE);
2238         /* This is our first new segment, we hold it
2239          * as we can't save it on the temp file */
2240         if (!QUEUE_IS_USING_QUEUE (queue)) {
2241           if (queue->segment_event_received)
2242             goto unexpected_event;
2243
2244           queue->segment_event_received = TRUE;
2245           if (queue->starting_segment != NULL)
2246             gst_event_unref (queue->starting_segment);
2247           queue->starting_segment = event;
2248           item = NULL;
2249         }
2250         /* a new segment allows us to accept more buffers if we got EOS
2251          * from downstream */
2252         queue->unexpected = FALSE;
2253         break;
2254       case GST_EVENT_GAP:
2255         apply_gap (queue, event, &queue->sink_segment, TRUE);
2256         break;
2257       case GST_EVENT_STREAM_START:
2258         if (!QUEUE_IS_USING_QUEUE (queue)) {
2259           gst_event_replace (&queue->stream_start_event, event);
2260           gst_event_unref (event);
2261           item = NULL;
2262         }
2263         break;
2264       case GST_EVENT_CAPS:{
2265         GstCaps *caps;
2266
2267         gst_event_parse_caps (event, &caps);
2268         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2269
2270         if (!QUEUE_IS_USING_QUEUE (queue)) {
2271           GST_LOG ("Dropping caps event, not using queue");
2272           gst_event_unref (event);
2273           item = NULL;
2274         }
2275         break;
2276       }
2277       default:
2278         if (!QUEUE_IS_USING_QUEUE (queue))
2279           goto unexpected_event;
2280         break;
2281     }
2282   } else if (GST_IS_QUERY (item)) {
2283     /* Can't happen as we check that in the caller */
2284     if (!QUEUE_IS_USING_QUEUE (queue))
2285       g_assert_not_reached ();
2286   } else {
2287     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2288         item, GST_OBJECT_NAME (queue));
2289     /* we can't really unref since we don't know what it is */
2290     item = NULL;
2291   }
2292
2293   if (item) {
2294     /* update the buffering status */
2295     if (queue->use_buffering)
2296       update_buffering (queue);
2297
2298     if (QUEUE_IS_USING_QUEUE (queue)) {
2299       GstQueue2Item qitem;
2300
2301       qitem.type = item_type;
2302       qitem.item = item;
2303       gst_queue_array_push_tail_struct (queue->queue, &qitem);
2304     } else {
2305       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2306     }
2307
2308     GST_QUEUE2_SIGNAL_ADD (queue);
2309   }
2310
2311   return;
2312
2313   /* ERRORS */
2314 unexpected_event:
2315   {
2316     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2317
2318     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2319         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2320         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2321     gst_event_unref (GST_EVENT_CAST (item));
2322     return;
2323   }
2324 }
2325
2326 /* dequeue an item from the queue and update level stats */
2327 static GstMiniObject *
2328 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2329 {
2330   GstMiniObject *item;
2331
2332   if (!QUEUE_IS_USING_QUEUE (queue)) {
2333     item = gst_queue2_read_item_from_file (queue);
2334   } else {
2335     GstQueue2Item *qitem = gst_queue_array_pop_head_struct (queue->queue);
2336
2337     if (qitem == NULL)
2338       goto no_item;
2339
2340     item = qitem->item;
2341   }
2342
2343   if (item == NULL)
2344     goto no_item;
2345
2346   if (GST_IS_BUFFER (item)) {
2347     GstBuffer *buffer;
2348     guint size;
2349
2350     buffer = GST_BUFFER_CAST (item);
2351     size = gst_buffer_get_size (buffer);
2352     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2353
2354     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2355         "retrieved buffer %p from queue", buffer);
2356
2357     if (QUEUE_IS_USING_QUEUE (queue)) {
2358       queue->cur_level.buffers--;
2359       queue->cur_level.bytes -= size;
2360     }
2361     queue->bytes_out += size;
2362
2363     apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2364     /* update the byterate stats */
2365     update_out_rates (queue);
2366     /* update the buffering */
2367     if (queue->use_buffering)
2368       update_buffering (queue);
2369
2370   } else if (GST_IS_EVENT (item)) {
2371     GstEvent *event = GST_EVENT_CAST (item);
2372
2373     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2374
2375     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2376         "retrieved event %p from queue", event);
2377
2378     switch (GST_EVENT_TYPE (event)) {
2379       case GST_EVENT_EOS:
2380         /* queue is empty now that we dequeued the EOS */
2381         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2382         break;
2383       case GST_EVENT_SEGMENT:
2384         apply_segment (queue, event, &queue->src_segment, FALSE);
2385         break;
2386       case GST_EVENT_GAP:
2387         apply_gap (queue, event, &queue->src_segment, FALSE);
2388         break;
2389       default:
2390         break;
2391     }
2392   } else if (GST_IS_BUFFER_LIST (item)) {
2393     GstBufferList *buffer_list;
2394     guint size;
2395
2396     buffer_list = GST_BUFFER_LIST_CAST (item);
2397     size = gst_buffer_list_calculate_size (buffer_list);
2398     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2399
2400     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2401         "retrieved buffer list %p from queue", buffer_list);
2402
2403     if (QUEUE_IS_USING_QUEUE (queue)) {
2404       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2405       queue->cur_level.bytes -= size;
2406     }
2407     queue->bytes_out += size;
2408
2409     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2410     /* update the byterate stats */
2411     update_out_rates (queue);
2412     /* update the buffering */
2413     if (queue->use_buffering)
2414       update_buffering (queue);
2415   } else if (GST_IS_QUERY (item)) {
2416     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2417         "retrieved query %p from queue", item);
2418     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2419   } else {
2420     g_warning
2421         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2422         item, GST_OBJECT_NAME (queue));
2423     item = NULL;
2424     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2425   }
2426   GST_QUEUE2_SIGNAL_DEL (queue);
2427
2428   return item;
2429
2430   /* ERRORS */
2431 no_item:
2432   {
2433     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2434     return NULL;
2435   }
2436 }
2437
2438 static GstFlowReturn
2439 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2440     GstEvent * event)
2441 {
2442   gboolean ret = TRUE;
2443   GstQueue2 *queue;
2444
2445   queue = GST_QUEUE2 (parent);
2446
2447   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
2448       GST_EVENT_TYPE_NAME (event));
2449
2450   switch (GST_EVENT_TYPE (event)) {
2451     case GST_EVENT_FLUSH_START:
2452     {
2453       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2454         /* forward event */
2455         ret = gst_pad_push_event (queue->srcpad, event);
2456
2457         /* now unblock the chain function */
2458         GST_QUEUE2_MUTEX_LOCK (queue);
2459         queue->srcresult = GST_FLOW_FLUSHING;
2460         queue->sinkresult = GST_FLOW_FLUSHING;
2461         /* unblock the loop and chain functions */
2462         GST_QUEUE2_SIGNAL_ADD (queue);
2463         GST_QUEUE2_SIGNAL_DEL (queue);
2464         GST_QUEUE2_MUTEX_UNLOCK (queue);
2465
2466         /* make sure it pauses, this should happen since we sent
2467          * flush_start downstream. */
2468         gst_pad_pause_task (queue->srcpad);
2469         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2470
2471         GST_QUEUE2_MUTEX_LOCK (queue);
2472         queue->last_query = FALSE;
2473         g_cond_signal (&queue->query_handled);
2474         GST_QUEUE2_MUTEX_UNLOCK (queue);
2475       } else {
2476         GST_QUEUE2_MUTEX_LOCK (queue);
2477         /* flush the sink pad */
2478         queue->sinkresult = GST_FLOW_FLUSHING;
2479         GST_QUEUE2_SIGNAL_DEL (queue);
2480         queue->last_query = FALSE;
2481         g_cond_signal (&queue->query_handled);
2482         GST_QUEUE2_MUTEX_UNLOCK (queue);
2483
2484         gst_event_unref (event);
2485       }
2486       break;
2487     }
2488     case GST_EVENT_FLUSH_STOP:
2489     {
2490       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2491         /* forward event */
2492         ret = gst_pad_push_event (queue->srcpad, event);
2493
2494         GST_QUEUE2_MUTEX_LOCK (queue);
2495         gst_queue2_locked_flush (queue, FALSE, TRUE);
2496         queue->srcresult = GST_FLOW_OK;
2497         queue->sinkresult = GST_FLOW_OK;
2498         queue->is_eos = FALSE;
2499         queue->unexpected = FALSE;
2500         queue->seeking = FALSE;
2501         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2502         /* reset rate counters */
2503         reset_rate_timer (queue);
2504         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2505             queue->srcpad, NULL);
2506         GST_QUEUE2_MUTEX_UNLOCK (queue);
2507       } else {
2508         GST_QUEUE2_MUTEX_LOCK (queue);
2509         queue->segment_event_received = FALSE;
2510         queue->is_eos = FALSE;
2511         queue->unexpected = FALSE;
2512         queue->sinkresult = GST_FLOW_OK;
2513         queue->seeking = FALSE;
2514         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2515         GST_QUEUE2_MUTEX_UNLOCK (queue);
2516
2517         gst_event_unref (event);
2518       }
2519       break;
2520     }
2521     case GST_EVENT_TAG:{
2522       if (queue->use_tags_bitrate) {
2523         GstTagList *tags;
2524         guint bitrate;
2525
2526         gst_event_parse_tag (event, &tags);
2527         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2528             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2529           GST_QUEUE2_MUTEX_LOCK (queue);
2530           queue->sink_tags_bitrate = bitrate;
2531           GST_QUEUE2_MUTEX_UNLOCK (queue);
2532           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2533         }
2534       }
2535       /* Fall-through */
2536     }
2537     default:
2538       if (GST_EVENT_IS_SERIALIZED (event)) {
2539         /* serialized events go in the queue */
2540
2541         /* STREAM_START and SEGMENT reset the EOS status of a
2542          * pad. Change the cached sinkpad flow result accordingly */
2543         if (queue->sinkresult == GST_FLOW_EOS
2544             && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
2545                 || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
2546           queue->sinkresult = GST_FLOW_OK;
2547
2548         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2549         if (queue->srcresult != GST_FLOW_OK) {
2550           /* Errors in sticky event pushing are no problem and ignored here
2551            * as they will cause more meaningful errors during data flow.
2552            * For EOS events, that are not followed by data flow, we still
2553            * return FALSE here though and report an error.
2554            */
2555           if (!GST_EVENT_IS_STICKY (event)) {
2556             goto out_flow_error;
2557           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2558             if (queue->srcresult == GST_FLOW_NOT_LINKED
2559                 || queue->srcresult < GST_FLOW_EOS) {
2560               GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
2561             }
2562             goto out_flow_error;
2563           }
2564         }
2565
2566         /* refuse more events on EOS unless they unset the EOS status */
2567         if (queue->is_eos) {
2568           switch (GST_EVENT_TYPE (event)) {
2569             case GST_EVENT_STREAM_START:
2570             case GST_EVENT_SEGMENT:
2571               /* Restart the loop */
2572               if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2573                 queue->srcresult = GST_FLOW_OK;
2574                 queue->is_eos = FALSE;
2575                 queue->unexpected = FALSE;
2576                 queue->seeking = FALSE;
2577                 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2578                 /* reset rate counters */
2579                 reset_rate_timer (queue);
2580                 gst_pad_start_task (queue->srcpad,
2581                     (GstTaskFunction) gst_queue2_loop, queue->srcpad, NULL);
2582               } else {
2583                 queue->is_eos = FALSE;
2584                 queue->unexpected = FALSE;
2585                 queue->seeking = FALSE;
2586                 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2587               }
2588
2589               break;
2590             default:
2591               goto out_eos;
2592           }
2593         }
2594
2595         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2596         GST_QUEUE2_MUTEX_UNLOCK (queue);
2597         gst_queue2_post_buffering (queue);
2598       } else {
2599         /* non-serialized events are passed downstream. */
2600         ret = gst_pad_push_event (queue->srcpad, event);
2601       }
2602       break;
2603   }
2604   if (ret == FALSE)
2605     return GST_FLOW_ERROR;
2606   return GST_FLOW_OK;
2607
2608   /* ERRORS */
2609 out_flushing:
2610   {
2611     GstFlowReturn ret = queue->sinkresult;
2612     GST_DEBUG_OBJECT (queue, "refusing event, we are %s",
2613         gst_flow_get_name (ret));
2614     GST_QUEUE2_MUTEX_UNLOCK (queue);
2615     gst_event_unref (event);
2616     return ret;
2617   }
2618 out_eos:
2619   {
2620     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2621     GST_QUEUE2_MUTEX_UNLOCK (queue);
2622     gst_event_unref (event);
2623     return GST_FLOW_EOS;
2624   }
2625 out_flow_error:
2626   {
2627     GST_LOG_OBJECT (queue,
2628         "refusing event, we have a downstream flow error: %s",
2629         gst_flow_get_name (queue->srcresult));
2630     GST_QUEUE2_MUTEX_UNLOCK (queue);
2631     gst_event_unref (event);
2632     return queue->srcresult;
2633   }
2634 }
2635
2636 static gboolean
2637 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2638     GstQuery * query)
2639 {
2640   GstQueue2 *queue;
2641   gboolean res;
2642
2643   queue = GST_QUEUE2 (parent);
2644
2645   switch (GST_QUERY_TYPE (query)) {
2646     default:
2647       if (GST_QUERY_IS_SERIALIZED (query)) {
2648         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2649         /* serialized events go in the queue. We need to be certain that we
2650          * don't cause deadlocks waiting for the query return value. We check if
2651          * the queue is empty (nothing is blocking downstream and the query can
2652          * be pushed for sure) or we are not buffering. If we are buffering,
2653          * the pipeline waits to unblock downstream until our queue fills up
2654          * completely, which can not happen if we block on the query..
2655          * Therefore we only potentially block when we are not buffering. */
2656         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2657         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2658                 || !queue->use_buffering)) {
2659           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2660             gst_queue2_locked_enqueue (queue, query,
2661                 GST_QUEUE2_ITEM_TYPE_QUERY);
2662
2663             STATUS (queue, queue->sinkpad, "wait for QUERY");
2664             while (queue->sinkresult == GST_FLOW_OK &&
2665                 queue->last_handled_query != query)
2666               g_cond_wait (&queue->query_handled, &queue->qlock);
2667             queue->last_handled_query = NULL;
2668             if (queue->sinkresult != GST_FLOW_OK)
2669               goto out_flushing;
2670             res = queue->last_query;
2671           } else {
2672             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2673             res = FALSE;
2674           }
2675         } else {
2676           GST_DEBUG_OBJECT (queue,
2677               "refusing query, we are not using the queue");
2678           res = FALSE;
2679         }
2680         GST_QUEUE2_MUTEX_UNLOCK (queue);
2681         gst_queue2_post_buffering (queue);
2682       } else {
2683         res = gst_pad_query_default (pad, parent, query);
2684       }
2685       break;
2686   }
2687   return res;
2688
2689   /* ERRORS */
2690 out_flushing:
2691   {
2692     GST_DEBUG_OBJECT (queue, "refusing query, we are %s",
2693         gst_flow_get_name (queue->sinkresult));
2694     GST_QUEUE2_MUTEX_UNLOCK (queue);
2695     return FALSE;
2696   }
2697 }
2698
2699 static gboolean
2700 gst_queue2_is_empty (GstQueue2 * queue)
2701 {
2702   /* never empty on EOS */
2703   if (queue->is_eos)
2704     return FALSE;
2705
2706   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2707     return queue->current->writing_pos <= queue->current->max_reading_pos;
2708   } else {
2709     if (gst_queue_array_get_length (queue->queue) == 0)
2710       return TRUE;
2711   }
2712
2713   return FALSE;
2714 }
2715
2716 static gboolean
2717 gst_queue2_is_filled (GstQueue2 * queue)
2718 {
2719   gboolean res;
2720
2721   /* always filled on EOS */
2722   if (queue->is_eos)
2723     return TRUE;
2724
2725 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2726     (queue->cur_level.format) >= ((alt_max) ? \
2727       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2728
2729   /* if using a ring buffer we're filled if all ring buffer space is used
2730    * _by the current range_ */
2731   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2732     guint64 rb_size = queue->ring_buffer_max_size;
2733     GST_DEBUG_OBJECT (queue,
2734         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2735         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2736     return CHECK_FILLED (bytes, rb_size);
2737   }
2738
2739   /* if using file, we're never filled if we don't have EOS */
2740   if (QUEUE_IS_USING_TEMP_FILE (queue))
2741     return FALSE;
2742
2743   /* we are never filled when we have no buffers at all */
2744   if (queue->cur_level.buffers == 0)
2745     return FALSE;
2746
2747   /* we are filled if one of the current levels exceeds the max */
2748   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2749       || CHECK_FILLED (time, 0);
2750
2751   /* if we need to, use the rate estimate to check against the max time we are
2752    * allowed to queue */
2753   if (queue->use_rate_estimate)
2754     res |= CHECK_FILLED (rate_time, 0);
2755
2756 #undef CHECK_FILLED
2757   return res;
2758 }
2759
2760 static GstFlowReturn
2761 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2762     GstMiniObject * item, GstQueue2ItemType item_type)
2763 {
2764   /* we have to lock the queue since we span threads */
2765   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2766   /* when we received EOS, we refuse more data */
2767   if (queue->is_eos)
2768     goto out_eos;
2769   /* when we received unexpected from downstream, refuse more buffers */
2770   if (queue->unexpected)
2771     goto out_unexpected;
2772
2773   /* while we didn't receive the newsegment, we're seeking and we skip data */
2774   if (queue->seeking)
2775     goto out_seeking;
2776
2777   if (!gst_queue2_wait_free_space (queue))
2778     goto out_flushing;
2779
2780   /* put buffer in queue now */
2781   gst_queue2_locked_enqueue (queue, item, item_type);
2782   GST_QUEUE2_MUTEX_UNLOCK (queue);
2783   gst_queue2_post_buffering (queue);
2784
2785   return GST_FLOW_OK;
2786
2787   /* special conditions */
2788 out_flushing:
2789   {
2790     GstFlowReturn ret = queue->sinkresult;
2791
2792     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2793         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2794     GST_QUEUE2_MUTEX_UNLOCK (queue);
2795     gst_mini_object_unref (item);
2796
2797     return ret;
2798   }
2799 out_eos:
2800   {
2801     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2802     GST_QUEUE2_MUTEX_UNLOCK (queue);
2803     gst_mini_object_unref (item);
2804
2805     return GST_FLOW_EOS;
2806   }
2807 out_seeking:
2808   {
2809     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2810     GST_QUEUE2_MUTEX_UNLOCK (queue);
2811     gst_mini_object_unref (item);
2812
2813     return GST_FLOW_OK;
2814   }
2815 out_unexpected:
2816   {
2817     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2818     GST_QUEUE2_MUTEX_UNLOCK (queue);
2819     gst_mini_object_unref (item);
2820
2821     return GST_FLOW_EOS;
2822   }
2823 }
2824
2825 static GstFlowReturn
2826 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2827 {
2828   GstQueue2 *queue;
2829
2830   queue = GST_QUEUE2 (parent);
2831
2832   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2833       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2834       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2835       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2836       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2837
2838   return gst_queue2_chain_buffer_or_buffer_list (queue,
2839       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2840 }
2841
2842 static GstFlowReturn
2843 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2844     GstBufferList * buffer_list)
2845 {
2846   GstQueue2 *queue;
2847
2848   queue = GST_QUEUE2 (parent);
2849
2850   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2851       "received buffer list %p", buffer_list);
2852
2853   return gst_queue2_chain_buffer_or_buffer_list (queue,
2854       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2855 }
2856
2857 static GstMiniObject *
2858 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2859 {
2860   GstMiniObject *data;
2861
2862   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2863
2864   /* stop pushing buffers, we dequeue all items until we see an item that we
2865    * can push again, which is EOS or SEGMENT. If there is nothing in the
2866    * queue we can push, we set a flag to make the sinkpad refuse more
2867    * buffers with an EOS return value until we receive something
2868    * pushable again or we get flushed. */
2869   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2870     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2871       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2872           "dropping EOS buffer %p", data);
2873       gst_buffer_unref (GST_BUFFER_CAST (data));
2874     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2875       GstEvent *event = GST_EVENT_CAST (data);
2876       GstEventType type = GST_EVENT_TYPE (event);
2877
2878       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
2879           || type == GST_EVENT_STREAM_START) {
2880         /* we found a pushable item in the queue, push it out */
2881         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2882             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2883         return data;
2884       }
2885       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2886           "dropping EOS event %p", event);
2887       gst_event_unref (event);
2888     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2889       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2890           "dropping EOS buffer list %p", data);
2891       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2892     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2893       queue->last_query = FALSE;
2894       g_cond_signal (&queue->query_handled);
2895       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2896     }
2897   }
2898   /* no more items in the queue. Set the unexpected flag so that upstream
2899    * make us refuse any more buffers on the sinkpad. Since we will still
2900    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2901    * task function does not shut down. */
2902   queue->unexpected = TRUE;
2903   return NULL;
2904 }
2905
2906 /* dequeue an item from the queue an push it downstream. This functions returns
2907  * the result of the push. */
2908 static GstFlowReturn
2909 gst_queue2_push_one (GstQueue2 * queue)
2910 {
2911   GstFlowReturn result = queue->srcresult;
2912   GstMiniObject *data;
2913   GstQueue2ItemType item_type;
2914
2915   data = gst_queue2_locked_dequeue (queue, &item_type);
2916   if (data == NULL)
2917     goto no_item;
2918
2919 next:
2920   STATUS (queue, queue->srcpad, "We have something dequeud");
2921   g_atomic_int_set (&queue->downstream_may_block,
2922       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2923       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2924   GST_QUEUE2_MUTEX_UNLOCK (queue);
2925   gst_queue2_post_buffering (queue);
2926
2927   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2928     GstBuffer *buffer;
2929
2930     buffer = GST_BUFFER_CAST (data);
2931
2932     result = gst_pad_push (queue->srcpad, buffer);
2933     g_atomic_int_set (&queue->downstream_may_block, 0);
2934
2935     /* need to check for srcresult here as well */
2936     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2937     if (result == GST_FLOW_EOS) {
2938       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2939       if (data != NULL)
2940         goto next;
2941       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2942        * to the caller so that the task function does not shut down */
2943       result = GST_FLOW_OK;
2944     }
2945   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2946     GstEvent *event = GST_EVENT_CAST (data);
2947     GstEventType type = GST_EVENT_TYPE (event);
2948
2949     if (type == GST_EVENT_TAG) {
2950       if (queue->use_tags_bitrate) {
2951         GstTagList *tags;
2952         guint bitrate;
2953
2954         gst_event_parse_tag (event, &tags);
2955         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2956             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2957           GST_QUEUE2_MUTEX_LOCK (queue);
2958           queue->src_tags_bitrate = bitrate;
2959           GST_QUEUE2_MUTEX_UNLOCK (queue);
2960           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
2961         }
2962       }
2963     }
2964
2965     gst_pad_push_event (queue->srcpad, event);
2966
2967     /* if we're EOS, return EOS so that the task pauses. */
2968     if (type == GST_EVENT_EOS) {
2969       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2970           "pushed EOS event %p, return EOS", event);
2971       result = GST_FLOW_EOS;
2972     }
2973
2974     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2975   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2976     GstBufferList *buffer_list;
2977
2978     buffer_list = GST_BUFFER_LIST_CAST (data);
2979
2980     result = gst_pad_push_list (queue->srcpad, buffer_list);
2981     g_atomic_int_set (&queue->downstream_may_block, 0);
2982
2983     /* need to check for srcresult here as well */
2984     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2985     if (result == GST_FLOW_EOS) {
2986       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2987       if (data != NULL)
2988         goto next;
2989       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2990        * to the caller so that the task function does not shut down */
2991       result = GST_FLOW_OK;
2992     }
2993   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2994     GstQuery *query = GST_QUERY_CAST (data);
2995
2996     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2997     queue->last_handled_query = query;
2998     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2999     GST_LOG_OBJECT (queue->srcpad, "Peered query");
3000     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3001         "did query %p, return %d", query, queue->last_query);
3002     g_cond_signal (&queue->query_handled);
3003     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3004     result = GST_FLOW_OK;
3005   }
3006   return result;
3007
3008   /* ERRORS */
3009 no_item:
3010   {
3011     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3012         "exit because we have no item in the queue");
3013     return GST_FLOW_ERROR;
3014   }
3015 out_flushing:
3016   {
3017     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are %s",
3018         gst_flow_get_name (queue->srcresult));
3019     return queue->srcresult;
3020   }
3021 }
3022
3023 /* called repeatedly with @pad as the source pad. This function should push out
3024  * data to the peer element. */
3025 static void
3026 gst_queue2_loop (GstPad * pad)
3027 {
3028   GstQueue2 *queue;
3029   GstFlowReturn ret;
3030
3031   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
3032
3033   /* have to lock for thread-safety */
3034   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3035
3036   if (gst_queue2_is_empty (queue)) {
3037     gboolean started;
3038
3039     /* pause the timer while we wait. The fact that we are waiting does not mean
3040      * the byterate on the output pad is lower */
3041     if ((started = queue->out_timer_started))
3042       g_timer_stop (queue->out_timer);
3043
3044     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
3045         "queue is empty, waiting for new data");
3046     do {
3047       /* Wait for data to be available, we could be unlocked because of a flush. */
3048       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
3049     }
3050     while (gst_queue2_is_empty (queue));
3051
3052     /* and continue if we were running before */
3053     if (started)
3054       g_timer_continue (queue->out_timer);
3055   }
3056   ret = gst_queue2_push_one (queue);
3057   queue->srcresult = ret;
3058   queue->sinkresult = ret;
3059   if (ret != GST_FLOW_OK)
3060     goto out_flushing;
3061
3062   GST_QUEUE2_MUTEX_UNLOCK (queue);
3063   gst_queue2_post_buffering (queue);
3064
3065   return;
3066
3067   /* ERRORS */
3068 out_flushing:
3069   {
3070     gboolean eos = queue->is_eos;
3071     GstFlowReturn ret = queue->srcresult;
3072
3073     gst_pad_pause_task (queue->srcpad);
3074     if (ret == GST_FLOW_FLUSHING) {
3075       gst_queue2_locked_flush (queue, FALSE, FALSE);
3076     } else {
3077       GST_QUEUE2_SIGNAL_DEL (queue);
3078       queue->last_query = FALSE;
3079       g_cond_signal (&queue->query_handled);
3080     }
3081     GST_QUEUE2_MUTEX_UNLOCK (queue);
3082     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3083         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
3084     /* Recalculate buffering levels before stopping since the source flow
3085      * might cause a different buffering level (like NOT_LINKED making
3086      * the queue appear as full) */
3087     if (queue->use_buffering)
3088       update_buffering (queue);
3089     gst_queue2_post_buffering (queue);
3090     /* let app know about us giving up if upstream is not expected to do so */
3091     /* EOS is already taken care of elsewhere */
3092     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3093       GST_ELEMENT_FLOW_ERROR (queue, ret);
3094       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3095     }
3096     return;
3097   }
3098 }
3099
3100 static gboolean
3101 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3102 {
3103   gboolean res = TRUE;
3104   GstQueue2 *queue = GST_QUEUE2 (parent);
3105
3106 #ifndef GST_DISABLE_GST_DEBUG
3107   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3108       event, GST_EVENT_TYPE_NAME (event));
3109 #endif
3110
3111   switch (GST_EVENT_TYPE (event)) {
3112     case GST_EVENT_FLUSH_START:
3113       if (QUEUE_IS_USING_QUEUE (queue)) {
3114         /* just forward upstream */
3115         res = gst_pad_push_event (queue->sinkpad, event);
3116       } else {
3117         /* now unblock the getrange function */
3118         GST_QUEUE2_MUTEX_LOCK (queue);
3119         GST_DEBUG_OBJECT (queue, "flushing");
3120         queue->srcresult = GST_FLOW_FLUSHING;
3121         GST_QUEUE2_SIGNAL_ADD (queue);
3122         GST_QUEUE2_MUTEX_UNLOCK (queue);
3123
3124         /* when using a temp file, we eat the event */
3125         res = TRUE;
3126         gst_event_unref (event);
3127       }
3128       break;
3129     case GST_EVENT_FLUSH_STOP:
3130       if (QUEUE_IS_USING_QUEUE (queue)) {
3131         /* just forward upstream */
3132         res = gst_pad_push_event (queue->sinkpad, event);
3133       } else {
3134         /* now unblock the getrange function */
3135         GST_QUEUE2_MUTEX_LOCK (queue);
3136         queue->srcresult = GST_FLOW_OK;
3137         GST_QUEUE2_MUTEX_UNLOCK (queue);
3138
3139         /* when using a temp file, we eat the event */
3140         res = TRUE;
3141         gst_event_unref (event);
3142       }
3143       break;
3144     case GST_EVENT_RECONFIGURE:
3145       GST_QUEUE2_MUTEX_LOCK (queue);
3146       /* assume downstream is linked now and try to push again */
3147       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3148         queue->srcresult = GST_FLOW_OK;
3149         queue->sinkresult = GST_FLOW_OK;
3150         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3151           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3152               NULL);
3153         }
3154       }
3155       GST_QUEUE2_MUTEX_UNLOCK (queue);
3156
3157       res = gst_pad_push_event (queue->sinkpad, event);
3158       break;
3159     default:
3160       res = gst_pad_push_event (queue->sinkpad, event);
3161       break;
3162   }
3163
3164   return res;
3165 }
3166
3167 static gboolean
3168 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3169 {
3170   GstQueue2 *queue;
3171
3172   queue = GST_QUEUE2 (parent);
3173
3174   switch (GST_QUERY_TYPE (query)) {
3175     case GST_QUERY_POSITION:
3176     {
3177       gint64 peer_pos;
3178       GstFormat format;
3179
3180       if (!gst_pad_peer_query (queue->sinkpad, query))
3181         goto peer_failed;
3182
3183       /* get peer position */
3184       gst_query_parse_position (query, &format, &peer_pos);
3185
3186       /* FIXME: this code assumes that there's no discont in the queue */
3187       switch (format) {
3188         case GST_FORMAT_BYTES:
3189           peer_pos -= queue->cur_level.bytes;
3190           if (peer_pos < 0)     /* Clamp result to 0 */
3191             peer_pos = 0;
3192           break;
3193         case GST_FORMAT_TIME:
3194           peer_pos -= queue->cur_level.time;
3195           if (peer_pos < 0)     /* Clamp result to 0 */
3196             peer_pos = 0;
3197           break;
3198         default:
3199           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3200               "know how to adjust value", gst_format_get_name (format));
3201           return FALSE;
3202       }
3203       /* set updated position */
3204       gst_query_set_position (query, format, peer_pos);
3205       break;
3206     }
3207     case GST_QUERY_DURATION:
3208     {
3209       GST_DEBUG_OBJECT (queue, "doing peer query");
3210
3211       if (!gst_pad_peer_query (queue->sinkpad, query))
3212         goto peer_failed;
3213
3214       GST_DEBUG_OBJECT (queue, "peer query success");
3215       break;
3216     }
3217     case GST_QUERY_BUFFERING:
3218     {
3219       gint percent;
3220       gboolean is_buffering;
3221       GstBufferingMode mode;
3222       gint avg_in, avg_out;
3223       gint64 buffering_left;
3224
3225       GST_DEBUG_OBJECT (queue, "query buffering");
3226
3227       get_buffering_level (queue, &is_buffering, &percent);
3228       percent = convert_to_buffering_percent (queue, percent);
3229       gst_query_set_buffering_percent (query, is_buffering, percent);
3230
3231       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3232           &buffering_left);
3233       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3234           buffering_left);
3235
3236       if (!QUEUE_IS_USING_QUEUE (queue)) {
3237         /* add ranges for download and ringbuffer buffering */
3238         GstFormat format;
3239         gint64 start, stop, range_start, range_stop;
3240         guint64 writing_pos;
3241         gint64 estimated_total;
3242         gint64 duration;
3243         gboolean peer_res, is_eos;
3244         GstQueue2Range *queued_ranges;
3245
3246         /* we need a current download region */
3247         if (queue->current == NULL)
3248           return FALSE;
3249
3250         writing_pos = queue->current->writing_pos;
3251         is_eos = queue->is_eos;
3252
3253         if (is_eos) {
3254           /* we're EOS, we know the duration in bytes now */
3255           peer_res = TRUE;
3256           duration = writing_pos;
3257         } else {
3258           /* get duration of upstream in bytes */
3259           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3260               GST_FORMAT_BYTES, &duration);
3261         }
3262
3263         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3264             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3265
3266         /* calculate remaining and total download time */
3267         if (peer_res && avg_in > 0.0)
3268           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3269         else
3270           estimated_total = -1;
3271
3272         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3273             estimated_total);
3274
3275         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3276
3277         switch (format) {
3278           case GST_FORMAT_PERCENT:
3279             /* we need duration */
3280             if (!peer_res)
3281               goto peer_failed;
3282
3283             start = 0;
3284             /* get our available data relative to the duration */
3285             if (duration != -1)
3286               stop =
3287                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3288                   duration);
3289             else
3290               stop = -1;
3291             break;
3292           case GST_FORMAT_BYTES:
3293             start = 0;
3294             stop = writing_pos;
3295             break;
3296           default:
3297             start = -1;
3298             stop = -1;
3299             break;
3300         }
3301
3302         /* fill out the buffered ranges */
3303         for (queued_ranges = queue->ranges; queued_ranges;
3304             queued_ranges = queued_ranges->next) {
3305           switch (format) {
3306             case GST_FORMAT_PERCENT:
3307               if (duration == -1) {
3308                 range_start = 0;
3309                 range_stop = 0;
3310                 break;
3311               }
3312               range_start =
3313                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3314                   queued_ranges->offset, duration);
3315               range_stop =
3316                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3317                   queued_ranges->writing_pos, duration);
3318               break;
3319             case GST_FORMAT_BYTES:
3320               range_start = queued_ranges->offset;
3321               range_stop = queued_ranges->writing_pos;
3322               break;
3323             default:
3324               range_start = -1;
3325               range_stop = -1;
3326               break;
3327           }
3328           if (range_start == range_stop)
3329             continue;
3330           GST_DEBUG_OBJECT (queue,
3331               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3332               G_GINT64_FORMAT, range_start, range_stop);
3333           gst_query_add_buffering_range (query, range_start, range_stop);
3334         }
3335
3336         gst_query_set_buffering_range (query, format, start, stop,
3337             estimated_total);
3338       }
3339       break;
3340     }
3341     case GST_QUERY_SCHEDULING:
3342     {
3343       gboolean pull_mode;
3344       GstSchedulingFlags flags = 0;
3345
3346       if (!gst_pad_peer_query (queue->sinkpad, query))
3347         goto peer_failed;
3348
3349       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3350
3351       /* we can operate in pull mode when we are using a tempfile */
3352       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3353
3354       if (pull_mode)
3355         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3356       gst_query_set_scheduling (query, flags, 0, -1, 0);
3357       if (pull_mode)
3358         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3359       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3360       break;
3361     }
3362     default:
3363       /* peer handled other queries */
3364       if (!gst_pad_query_default (pad, parent, query))
3365         goto peer_failed;
3366       break;
3367   }
3368
3369   return TRUE;
3370
3371   /* ERRORS */
3372 peer_failed:
3373   {
3374     GST_DEBUG_OBJECT (queue, "failed peer query");
3375     return FALSE;
3376   }
3377 }
3378
3379 static gboolean
3380 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3381 {
3382   GstQueue2 *queue = GST_QUEUE2 (element);
3383
3384   /* simply forward to the srcpad query function */
3385   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3386       query);
3387 }
3388
3389 static void
3390 gst_queue2_update_upstream_size (GstQueue2 * queue)
3391 {
3392   gint64 upstream_size = -1;
3393
3394   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3395           &upstream_size)) {
3396     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3397
3398     /* upstream_size can be negative but queue->upstream_size is unsigned.
3399      * Prevent setting negative values to it (the query can return -1) */
3400     if (upstream_size >= 0)
3401       queue->upstream_size = upstream_size;
3402     else
3403       queue->upstream_size = 0;
3404   }
3405 }
3406
3407 static GstFlowReturn
3408 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3409     guint length, GstBuffer ** buffer)
3410 {
3411   GstQueue2 *queue;
3412   GstFlowReturn ret;
3413
3414   queue = GST_QUEUE2_CAST (parent);
3415
3416   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3417   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3418   offset = (offset == -1) ? queue->current->reading_pos : offset;
3419
3420   GST_DEBUG_OBJECT (queue,
3421       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3422
3423   /* catch any reads beyond the size of the file here to make sure queue2
3424    * doesn't send seek events beyond the size of the file upstream, since
3425    * that would confuse elements such as souphttpsrc and/or http servers.
3426    * Demuxers often just loop until EOS at the end of the file to figure out
3427    * when they've read all the end-headers or index chunks. */
3428   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3429     gst_queue2_update_upstream_size (queue);
3430     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3431       goto out_unexpected;
3432   }
3433
3434   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3435     gst_queue2_update_upstream_size (queue);
3436     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3437       length = queue->upstream_size - offset;
3438       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3439     }
3440   }
3441
3442   /* FIXME - function will block when the range is not yet available */
3443   ret = gst_queue2_create_read (queue, offset, length, buffer);
3444   GST_QUEUE2_MUTEX_UNLOCK (queue);
3445   gst_queue2_post_buffering (queue);
3446
3447   return ret;
3448
3449   /* ERRORS */
3450 out_flushing:
3451   {
3452     ret = queue->srcresult;
3453
3454     GST_DEBUG_OBJECT (queue, "we are %s", gst_flow_get_name (ret));
3455     GST_QUEUE2_MUTEX_UNLOCK (queue);
3456     return ret;
3457   }
3458 out_unexpected:
3459   {
3460     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3461     GST_QUEUE2_MUTEX_UNLOCK (queue);
3462     return GST_FLOW_EOS;
3463   }
3464 }
3465
3466 /* sink currently only operates in push mode */
3467 static gboolean
3468 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3469     GstPadMode mode, gboolean active)
3470 {
3471   gboolean result;
3472   GstQueue2 *queue;
3473
3474   queue = GST_QUEUE2 (parent);
3475
3476   switch (mode) {
3477     case GST_PAD_MODE_PUSH:
3478       if (active) {
3479         GST_QUEUE2_MUTEX_LOCK (queue);
3480         GST_DEBUG_OBJECT (queue, "activating push mode");
3481         queue->srcresult = GST_FLOW_OK;
3482         queue->sinkresult = GST_FLOW_OK;
3483         queue->is_eos = FALSE;
3484         queue->unexpected = FALSE;
3485         reset_rate_timer (queue);
3486         GST_QUEUE2_MUTEX_UNLOCK (queue);
3487       } else {
3488         /* unblock chain function */
3489         GST_QUEUE2_MUTEX_LOCK (queue);
3490         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3491         queue->srcresult = GST_FLOW_FLUSHING;
3492         queue->sinkresult = GST_FLOW_FLUSHING;
3493         GST_QUEUE2_SIGNAL_DEL (queue);
3494         GST_QUEUE2_MUTEX_UNLOCK (queue);
3495
3496         /* wait until it is unblocked and clean up */
3497         GST_PAD_STREAM_LOCK (pad);
3498         GST_QUEUE2_MUTEX_LOCK (queue);
3499         gst_queue2_locked_flush (queue, TRUE, FALSE);
3500         GST_QUEUE2_MUTEX_UNLOCK (queue);
3501         GST_PAD_STREAM_UNLOCK (pad);
3502       }
3503       result = TRUE;
3504       break;
3505     default:
3506       result = FALSE;
3507       break;
3508   }
3509   return result;
3510 }
3511
3512 /* src operating in push mode, we start a task on the source pad that pushes out
3513  * buffers from the queue */
3514 static gboolean
3515 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3516 {
3517   gboolean result = FALSE;
3518   GstQueue2 *queue;
3519
3520   queue = GST_QUEUE2 (parent);
3521
3522   if (active) {
3523     GST_QUEUE2_MUTEX_LOCK (queue);
3524     GST_DEBUG_OBJECT (queue, "activating push mode");
3525     queue->srcresult = GST_FLOW_OK;
3526     queue->sinkresult = GST_FLOW_OK;
3527     queue->is_eos = FALSE;
3528     queue->unexpected = FALSE;
3529     result =
3530         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3531     GST_QUEUE2_MUTEX_UNLOCK (queue);
3532   } else {
3533     /* unblock loop function */
3534     GST_QUEUE2_MUTEX_LOCK (queue);
3535     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3536     queue->srcresult = GST_FLOW_FLUSHING;
3537     queue->sinkresult = GST_FLOW_FLUSHING;
3538     /* the item add signal will unblock */
3539     GST_QUEUE2_SIGNAL_ADD (queue);
3540     GST_QUEUE2_MUTEX_UNLOCK (queue);
3541
3542     /* step 2, make sure streaming finishes */
3543     result = gst_pad_stop_task (pad);
3544
3545     GST_QUEUE2_MUTEX_LOCK (queue);
3546     gst_queue2_locked_flush (queue, FALSE, FALSE);
3547     GST_QUEUE2_MUTEX_UNLOCK (queue);
3548   }
3549
3550   return result;
3551 }
3552
3553 /* pull mode, downstream will call our getrange function */
3554 static gboolean
3555 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3556 {
3557   gboolean result;
3558   GstQueue2 *queue;
3559
3560   queue = GST_QUEUE2 (parent);
3561
3562   if (active) {
3563     GST_QUEUE2_MUTEX_LOCK (queue);
3564     if (!QUEUE_IS_USING_QUEUE (queue)) {
3565       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3566         /* open the temp file now */
3567         result = gst_queue2_open_temp_location_file (queue);
3568       } else if (!queue->ring_buffer) {
3569         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3570         result = ! !queue->ring_buffer;
3571       } else {
3572         result = TRUE;
3573       }
3574
3575       GST_DEBUG_OBJECT (queue, "activating pull mode");
3576       init_ranges (queue);
3577       queue->srcresult = GST_FLOW_OK;
3578       queue->sinkresult = GST_FLOW_OK;
3579       queue->is_eos = FALSE;
3580       queue->unexpected = FALSE;
3581       queue->upstream_size = 0;
3582     } else {
3583       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3584       /* this is not allowed, we cannot operate in pull mode without a temp
3585        * file. */
3586       queue->srcresult = GST_FLOW_FLUSHING;
3587       queue->sinkresult = GST_FLOW_FLUSHING;
3588       result = FALSE;
3589     }
3590     GST_QUEUE2_MUTEX_UNLOCK (queue);
3591   } else {
3592     GST_QUEUE2_MUTEX_LOCK (queue);
3593     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3594     queue->srcresult = GST_FLOW_FLUSHING;
3595     queue->sinkresult = GST_FLOW_FLUSHING;
3596     /* this will unlock getrange */
3597     GST_QUEUE2_SIGNAL_ADD (queue);
3598     result = TRUE;
3599     GST_QUEUE2_MUTEX_UNLOCK (queue);
3600   }
3601
3602   return result;
3603 }
3604
3605 static gboolean
3606 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3607     gboolean active)
3608 {
3609   gboolean res;
3610
3611   switch (mode) {
3612     case GST_PAD_MODE_PULL:
3613       res = gst_queue2_src_activate_pull (pad, parent, active);
3614       break;
3615     case GST_PAD_MODE_PUSH:
3616       res = gst_queue2_src_activate_push (pad, parent, active);
3617       break;
3618     default:
3619       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3620       res = FALSE;
3621       break;
3622   }
3623   return res;
3624 }
3625
3626 static GstStateChangeReturn
3627 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3628 {
3629   GstQueue2 *queue;
3630   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3631
3632   queue = GST_QUEUE2 (element);
3633
3634   switch (transition) {
3635     case GST_STATE_CHANGE_NULL_TO_READY:
3636       break;
3637     case GST_STATE_CHANGE_READY_TO_PAUSED:
3638       GST_QUEUE2_MUTEX_LOCK (queue);
3639       if (!QUEUE_IS_USING_QUEUE (queue)) {
3640         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3641           if (!gst_queue2_open_temp_location_file (queue))
3642             ret = GST_STATE_CHANGE_FAILURE;
3643         } else {
3644           if (queue->ring_buffer) {
3645             g_free (queue->ring_buffer);
3646             queue->ring_buffer = NULL;
3647           }
3648           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3649             ret = GST_STATE_CHANGE_FAILURE;
3650         }
3651         init_ranges (queue);
3652       }
3653       queue->segment_event_received = FALSE;
3654       queue->starting_segment = NULL;
3655       gst_event_replace (&queue->stream_start_event, NULL);
3656       GST_QUEUE2_MUTEX_UNLOCK (queue);
3657       break;
3658     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3659       break;
3660     default:
3661       break;
3662   }
3663
3664   if (ret == GST_STATE_CHANGE_FAILURE)
3665     return ret;
3666
3667   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3668
3669   if (ret == GST_STATE_CHANGE_FAILURE)
3670     return ret;
3671
3672   switch (transition) {
3673     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3674       break;
3675     case GST_STATE_CHANGE_PAUSED_TO_READY:
3676       GST_QUEUE2_MUTEX_LOCK (queue);
3677       if (!QUEUE_IS_USING_QUEUE (queue)) {
3678         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3679           gst_queue2_close_temp_location_file (queue);
3680         } else if (queue->ring_buffer) {
3681           g_free (queue->ring_buffer);
3682           queue->ring_buffer = NULL;
3683         }
3684         clean_ranges (queue);
3685       }
3686       if (queue->starting_segment != NULL) {
3687         gst_event_unref (queue->starting_segment);
3688         queue->starting_segment = NULL;
3689       }
3690       gst_event_replace (&queue->stream_start_event, NULL);
3691       GST_QUEUE2_MUTEX_UNLOCK (queue);
3692       break;
3693     case GST_STATE_CHANGE_READY_TO_NULL:
3694       break;
3695     default:
3696       break;
3697   }
3698
3699   return ret;
3700 }
3701
3702 /* changing the capacity of the queue must wake up
3703  * the _chain function, it might have more room now
3704  * to store the buffer/event in the queue */
3705 #define QUEUE_CAPACITY_CHANGE(q) \
3706   GST_QUEUE2_SIGNAL_DEL (queue); \
3707   if (queue->use_buffering)      \
3708     update_buffering (queue);
3709
3710 /* Changing the minimum required fill level must
3711  * wake up the _loop function as it might now
3712  * be able to preceed.
3713  */
3714 #define QUEUE_THRESHOLD_CHANGE(q)\
3715   GST_QUEUE2_SIGNAL_ADD (queue);
3716
3717 static void
3718 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3719 {
3720   GstState state;
3721
3722   /* the element must be stopped in order to do this */
3723   GST_OBJECT_LOCK (queue);
3724   state = GST_STATE (queue);
3725   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3726     goto wrong_state;
3727   GST_OBJECT_UNLOCK (queue);
3728
3729   /* set new location */
3730   g_free (queue->temp_template);
3731   queue->temp_template = g_strdup (template);
3732
3733   return;
3734
3735 /* ERROR */
3736 wrong_state:
3737   {
3738     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3739     GST_OBJECT_UNLOCK (queue);
3740   }
3741 }
3742
3743 static void
3744 gst_queue2_set_property (GObject * object,
3745     guint prop_id, const GValue * value, GParamSpec * pspec)
3746 {
3747   GstQueue2 *queue = GST_QUEUE2 (object);
3748
3749   /* someone could change levels here, and since this
3750    * affects the get/put funcs, we need to lock for safety. */
3751   GST_QUEUE2_MUTEX_LOCK (queue);
3752
3753   switch (prop_id) {
3754     case PROP_MAX_SIZE_BYTES:
3755       queue->max_level.bytes = g_value_get_uint (value);
3756       QUEUE_CAPACITY_CHANGE (queue);
3757       break;
3758     case PROP_MAX_SIZE_BUFFERS:
3759       queue->max_level.buffers = g_value_get_uint (value);
3760       QUEUE_CAPACITY_CHANGE (queue);
3761       break;
3762     case PROP_MAX_SIZE_TIME:
3763       queue->max_level.time = g_value_get_uint64 (value);
3764       /* set rate_time to the same value. We use an extra field in the level
3765        * structure so that we can easily access and compare it */
3766       queue->max_level.rate_time = queue->max_level.time;
3767       QUEUE_CAPACITY_CHANGE (queue);
3768       break;
3769     case PROP_USE_BUFFERING:
3770       queue->use_buffering = g_value_get_boolean (value);
3771       if (!queue->use_buffering && queue->is_buffering) {
3772         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3773             "posting 100%% message");
3774         SET_PERCENT (queue, 100);
3775         queue->is_buffering = FALSE;
3776       }
3777
3778       if (queue->use_buffering) {
3779         queue->is_buffering = TRUE;
3780         update_buffering (queue);
3781       }
3782       break;
3783     case PROP_USE_TAGS_BITRATE:
3784       queue->use_tags_bitrate = g_value_get_boolean (value);
3785       break;
3786     case PROP_USE_RATE_ESTIMATE:
3787       queue->use_rate_estimate = g_value_get_boolean (value);
3788       break;
3789     case PROP_LOW_PERCENT:
3790       queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3791       if (queue->is_buffering)
3792         update_buffering (queue);
3793       break;
3794     case PROP_HIGH_PERCENT:
3795       queue->high_watermark =
3796           g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3797       if (queue->is_buffering)
3798         update_buffering (queue);
3799       break;
3800     case PROP_LOW_WATERMARK:
3801       queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3802       if (queue->is_buffering)
3803         update_buffering (queue);
3804       break;
3805     case PROP_HIGH_WATERMARK:
3806       queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3807       if (queue->is_buffering)
3808         update_buffering (queue);
3809       break;
3810     case PROP_TEMP_TEMPLATE:
3811       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3812       break;
3813     case PROP_TEMP_REMOVE:
3814       queue->temp_remove = g_value_get_boolean (value);
3815       break;
3816     case PROP_RING_BUFFER_MAX_SIZE:
3817       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3818       break;
3819     default:
3820       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3821       break;
3822   }
3823
3824   GST_QUEUE2_MUTEX_UNLOCK (queue);
3825   gst_queue2_post_buffering (queue);
3826 }
3827
3828 static void
3829 gst_queue2_get_property (GObject * object,
3830     guint prop_id, GValue * value, GParamSpec * pspec)
3831 {
3832   GstQueue2 *queue = GST_QUEUE2 (object);
3833
3834   GST_QUEUE2_MUTEX_LOCK (queue);
3835
3836   switch (prop_id) {
3837     case PROP_CUR_LEVEL_BYTES:
3838       g_value_set_uint (value, queue->cur_level.bytes);
3839       break;
3840     case PROP_CUR_LEVEL_BUFFERS:
3841       g_value_set_uint (value, queue->cur_level.buffers);
3842       break;
3843     case PROP_CUR_LEVEL_TIME:
3844       g_value_set_uint64 (value, queue->cur_level.time);
3845       break;
3846     case PROP_MAX_SIZE_BYTES:
3847       g_value_set_uint (value, queue->max_level.bytes);
3848       break;
3849     case PROP_MAX_SIZE_BUFFERS:
3850       g_value_set_uint (value, queue->max_level.buffers);
3851       break;
3852     case PROP_MAX_SIZE_TIME:
3853       g_value_set_uint64 (value, queue->max_level.time);
3854       break;
3855     case PROP_USE_BUFFERING:
3856       g_value_set_boolean (value, queue->use_buffering);
3857       break;
3858     case PROP_USE_TAGS_BITRATE:
3859       g_value_set_boolean (value, queue->use_tags_bitrate);
3860       break;
3861     case PROP_USE_RATE_ESTIMATE:
3862       g_value_set_boolean (value, queue->use_rate_estimate);
3863       break;
3864     case PROP_LOW_PERCENT:
3865       g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
3866       break;
3867     case PROP_HIGH_PERCENT:
3868       g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
3869       break;
3870     case PROP_LOW_WATERMARK:
3871       g_value_set_double (value, queue->low_watermark /
3872           (gdouble) MAX_BUFFERING_LEVEL);
3873       break;
3874     case PROP_HIGH_WATERMARK:
3875       g_value_set_double (value, queue->high_watermark /
3876           (gdouble) MAX_BUFFERING_LEVEL);
3877       break;
3878     case PROP_TEMP_TEMPLATE:
3879       g_value_set_string (value, queue->temp_template);
3880       break;
3881     case PROP_TEMP_LOCATION:
3882       g_value_set_string (value, queue->temp_location);
3883       break;
3884     case PROP_TEMP_REMOVE:
3885       g_value_set_boolean (value, queue->temp_remove);
3886       break;
3887     case PROP_RING_BUFFER_MAX_SIZE:
3888       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3889       break;
3890     case PROP_AVG_IN_RATE:
3891     {
3892       gdouble in_rate = queue->byte_in_rate;
3893
3894       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3895        * calculated, so calculate it here. */
3896       if (in_rate == 0.0 && queue->bytes_in
3897           && queue->last_update_in_rates_elapsed > 0.0)
3898         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3899
3900       g_value_set_int64 (value, (gint64) in_rate);
3901       break;
3902     }
3903     default:
3904       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3905       break;
3906   }
3907
3908   GST_QUEUE2_MUTEX_UNLOCK (queue);
3909 }