queue2: don't print criticals when receiving custom events in ring buffer mode
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
77     GST_PAD_SINK,
78     GST_PAD_ALWAYS,
79     GST_STATIC_CAPS_ANY);
80
81 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
82     GST_PAD_SRC,
83     GST_PAD_ALWAYS,
84     GST_STATIC_CAPS_ANY);
85
86 GST_DEBUG_CATEGORY_STATIC (queue_debug);
87 #define GST_CAT_DEFAULT (queue_debug)
88 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
89
90 enum
91 {
92   SIGNAL_OVERRUN,
93   LAST_SIGNAL
94 };
95
96 /* other defines */
97 #define DEFAULT_BUFFER_SIZE 4096
98 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
99 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
100 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
101
102 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
103
104 /* default property values */
105 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
106 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
107 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
108 #define DEFAULT_USE_BUFFERING      FALSE
109 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
110 #define DEFAULT_LOW_PERCENT        10
111 #define DEFAULT_HIGH_PERCENT       99
112 #define DEFAULT_TEMP_REMOVE        TRUE
113 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
114
115 enum
116 {
117   PROP_0,
118   PROP_CUR_LEVEL_BUFFERS,
119   PROP_CUR_LEVEL_BYTES,
120   PROP_CUR_LEVEL_TIME,
121   PROP_MAX_SIZE_BUFFERS,
122   PROP_MAX_SIZE_BYTES,
123   PROP_MAX_SIZE_TIME,
124   PROP_USE_BUFFERING,
125   PROP_USE_RATE_ESTIMATE,
126   PROP_LOW_PERCENT,
127   PROP_HIGH_PERCENT,
128   PROP_TEMP_TEMPLATE,
129   PROP_TEMP_LOCATION,
130   PROP_TEMP_REMOVE,
131   PROP_RING_BUFFER_MAX_SIZE,
132   PROP_AVG_IN_RATE,
133   PROP_LAST
134 };
135
136 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
137   l.buffers = 0;                                        \
138   l.bytes = 0;                                          \
139   l.time = 0;                                           \
140   l.rate_time = 0;                                      \
141 } G_STMT_END
142
143 #define STATUS(queue, pad, msg) \
144   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
145                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
146                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
147                       " ns, %"G_GUINT64_FORMAT" items", \
148                       GST_DEBUG_PAD_NAME (pad), \
149                       queue->cur_level.buffers, \
150                       queue->max_level.buffers, \
151                       queue->cur_level.bytes, \
152                       queue->max_level.bytes, \
153                       queue->cur_level.time, \
154                       queue->max_level.time, \
155                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
156                         queue->current->writing_pos - queue->current->max_reading_pos : \
157                         queue->queue.length))
158
159 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
160   g_mutex_lock (&q->qlock);                                              \
161 } G_STMT_END
162
163 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
164   GST_QUEUE2_MUTEX_LOCK (q);                                            \
165   if (res != GST_FLOW_OK)                                               \
166     goto label;                                                         \
167 } G_STMT_END
168
169 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
170   g_mutex_unlock (&q->qlock);                                            \
171 } G_STMT_END
172
173 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
174   STATUS (queue, q->sinkpad, "wait for DEL");                           \
175   q->waiting_del = TRUE;                                                \
176   g_cond_wait (&q->item_del, &queue->qlock);                              \
177   q->waiting_del = FALSE;                                               \
178   if (res != GST_FLOW_OK) {                                             \
179     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
180     goto label;                                                         \
181   }                                                                     \
182   STATUS (queue, q->sinkpad, "received DEL");                           \
183 } G_STMT_END
184
185 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
186   STATUS (queue, q->srcpad, "wait for ADD");                            \
187   q->waiting_add = TRUE;                                                \
188   g_cond_wait (&q->item_add, &q->qlock);                                  \
189   q->waiting_add = FALSE;                                               \
190   if (res != GST_FLOW_OK) {                                             \
191     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
192     goto label;                                                         \
193   }                                                                     \
194   STATUS (queue, q->srcpad, "received ADD");                            \
195 } G_STMT_END
196
197 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
198   if (q->waiting_del) {                                                 \
199     STATUS (q, q->srcpad, "signal DEL");                                \
200     g_cond_signal (&q->item_del);                                        \
201   }                                                                     \
202 } G_STMT_END
203
204 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
205   if (q->waiting_add) {                                                 \
206     STATUS (q, q->sinkpad, "signal ADD");                               \
207     g_cond_signal (&q->item_add);                                        \
208   }                                                                     \
209 } G_STMT_END
210
211 #define SET_PERCENT(q, perc) G_STMT_START {                              \
212   if (perc != q->buffering_percent) {                                    \
213     q->buffering_percent = perc;                                         \
214     q->percent_changed = TRUE;                                           \
215     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
216     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
217         &q->buffering_left);                                             \
218   }                                                                      \
219 } G_STMT_END
220
221 #define _do_init \
222     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
223     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
224         "dataflow inside the queue element");
225 #define gst_queue2_parent_class parent_class
226 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
227
228 static void gst_queue2_finalize (GObject * object);
229
230 static void gst_queue2_set_property (GObject * object,
231     guint prop_id, const GValue * value, GParamSpec * pspec);
232 static void gst_queue2_get_property (GObject * object,
233     guint prop_id, GValue * value, GParamSpec * pspec);
234
235 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
236     GstBuffer * buffer);
237 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
238     GstBufferList * buffer_list);
239 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
240 static void gst_queue2_loop (GstPad * pad);
241
242 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
243     GstEvent * event);
244 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
245     GstQuery * query);
246
247 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
248     GstEvent * event);
249 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
250     GstQuery * query);
251 static gboolean gst_queue2_handle_query (GstElement * element,
252     GstQuery * query);
253
254 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
255     guint64 offset, guint length, GstBuffer ** buffer);
256
257 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
258     GstPadMode mode, gboolean active);
259 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
260     GstPadMode mode, gboolean active);
261 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
262     GstStateChange transition);
263
264 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
265 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
266
267 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
268 static void update_in_rates (GstQueue2 * queue);
269 static void gst_queue2_post_buffering (GstQueue2 * queue);
270
271 typedef enum
272 {
273   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
274   GST_QUEUE2_ITEM_TYPE_BUFFER,
275   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
276   GST_QUEUE2_ITEM_TYPE_EVENT,
277   GST_QUEUE2_ITEM_TYPE_QUERY
278 } GstQueue2ItemType;
279
280 typedef struct
281 {
282   GstQueue2ItemType type;
283   GstMiniObject *item;
284 } GstQueue2Item;
285
286 static guint gst_queue2_signals[LAST_SIGNAL] = { 0 };
287
288 static void
289 gst_queue2_class_init (GstQueue2Class * klass)
290 {
291   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
292   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
293
294   gobject_class->set_property = gst_queue2_set_property;
295   gobject_class->get_property = gst_queue2_get_property;
296
297   /* signals */
298   /**
299    * GstQueue2::overrun:
300    * @queue: the queue2 instance
301    *
302    * Reports that the buffer became full (overrun).
303    * A buffer is full if the total amount of data inside it (num-buffers, time,
304    * size) is higher than the boundary values which can be set through the
305    * GObject properties.
306    *
307    * Since: 1.8
308    */
309   gst_queue2_signals[SIGNAL_OVERRUN] =
310       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
311       G_STRUCT_OFFSET (GstQueue2Class, overrun), NULL, NULL,
312       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
313
314   /* properties */
315   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
316       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
317           "Current amount of data in the queue (bytes)",
318           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
319   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
320       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
321           "Current number of buffers in the queue",
322           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
323   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
324       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
325           "Current amount of data in the queue (in ns)",
326           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
327
328   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
329       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
330           "Max. amount of data in the queue (bytes, 0=disable)",
331           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
332           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
333           G_PARAM_STATIC_STRINGS));
334   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
335       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
336           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
337           DEFAULT_MAX_SIZE_BUFFERS,
338           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
339           G_PARAM_STATIC_STRINGS));
340   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
341       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
342           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
343           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
344           G_PARAM_STATIC_STRINGS));
345
346   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
347       g_param_spec_boolean ("use-buffering", "Use buffering",
348           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
349           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
350           G_PARAM_STATIC_STRINGS));
351   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
352       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
353           "Estimate the bitrate of the stream to calculate time level",
354           DEFAULT_USE_RATE_ESTIMATE,
355           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
357       g_param_spec_int ("low-percent", "Low percent",
358           "Low threshold for buffering to start. Only used if use-buffering is True",
359           0, 100, DEFAULT_LOW_PERCENT,
360           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
362       g_param_spec_int ("high-percent", "High percent",
363           "High threshold for buffering to finish. Only used if use-buffering is True",
364           0, 100, DEFAULT_HIGH_PERCENT,
365           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366
367   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
368       g_param_spec_string ("temp-template", "Temporary File Template",
369           "File template to store temporary files in, should contain directory "
370           "and XXXXXX. (NULL == disabled)",
371           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372
373   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
374       g_param_spec_string ("temp-location", "Temporary File Location",
375           "Location to store temporary files in (Only read this property, "
376           "use temp-template to configure the name template)",
377           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
378
379   /**
380    * GstQueue2:temp-remove
381    *
382    * When temp-template is set, remove the temporary file when going to READY.
383    */
384   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
385       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
386           "Remove the temp-location after use",
387           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388
389   /**
390    * GstQueue2:ring-buffer-max-size
391    *
392    * The maximum size of the ring buffer in bytes. If set to 0, the ring
393    * buffer is disabled. Default 0.
394    */
395   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
396       g_param_spec_uint64 ("ring-buffer-max-size",
397           "Max. ring buffer size (bytes)",
398           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
399           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
400           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
401
402   /**
403    * GstQueue2:avg-in-rate
404    *
405    * The average input data rate.
406    */
407   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
408       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
409           "Average input data rate (bytes/s)",
410           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
411
412   /* set several parent class virtual functions */
413   gobject_class->finalize = gst_queue2_finalize;
414
415   gst_element_class_add_pad_template (gstelement_class,
416       gst_static_pad_template_get (&srctemplate));
417   gst_element_class_add_pad_template (gstelement_class,
418       gst_static_pad_template_get (&sinktemplate));
419
420   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
421       "Generic",
422       "Simple data queue",
423       "Erik Walthinsen <omega@cse.ogi.edu>, "
424       "Wim Taymans <wim.taymans@gmail.com>");
425
426   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
427   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
428 }
429
430 static void
431 gst_queue2_init (GstQueue2 * queue)
432 {
433   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
434
435   gst_pad_set_chain_function (queue->sinkpad,
436       GST_DEBUG_FUNCPTR (gst_queue2_chain));
437   gst_pad_set_chain_list_function (queue->sinkpad,
438       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
439   gst_pad_set_activatemode_function (queue->sinkpad,
440       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
441   gst_pad_set_event_function (queue->sinkpad,
442       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
443   gst_pad_set_query_function (queue->sinkpad,
444       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
445   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
446   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
447
448   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
449
450   gst_pad_set_activatemode_function (queue->srcpad,
451       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
452   gst_pad_set_getrange_function (queue->srcpad,
453       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
454   gst_pad_set_event_function (queue->srcpad,
455       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
456   gst_pad_set_query_function (queue->srcpad,
457       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
458   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
459   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
460
461   /* levels */
462   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
463   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
464   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
465   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
466   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
467   queue->use_buffering = DEFAULT_USE_BUFFERING;
468   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
469   queue->low_percent = DEFAULT_LOW_PERCENT;
470   queue->high_percent = DEFAULT_HIGH_PERCENT;
471
472   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
473   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
474
475   queue->sinktime = GST_CLOCK_TIME_NONE;
476   queue->srctime = GST_CLOCK_TIME_NONE;
477   queue->sink_tainted = TRUE;
478   queue->src_tainted = TRUE;
479
480   queue->srcresult = GST_FLOW_FLUSHING;
481   queue->sinkresult = GST_FLOW_FLUSHING;
482   queue->is_eos = FALSE;
483   queue->in_timer = g_timer_new ();
484   queue->out_timer = g_timer_new ();
485
486   g_mutex_init (&queue->qlock);
487   queue->waiting_add = FALSE;
488   g_cond_init (&queue->item_add);
489   queue->waiting_del = FALSE;
490   g_cond_init (&queue->item_del);
491   g_queue_init (&queue->queue);
492
493   g_cond_init (&queue->query_handled);
494   queue->last_query = FALSE;
495
496   g_mutex_init (&queue->buffering_post_lock);
497   queue->buffering_percent = 100;
498
499   /* tempfile related */
500   queue->temp_template = NULL;
501   queue->temp_location = NULL;
502   queue->temp_remove = DEFAULT_TEMP_REMOVE;
503
504   queue->ring_buffer = NULL;
505   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
506
507   GST_DEBUG_OBJECT (queue,
508       "initialized queue's not_empty & not_full conditions");
509 }
510
511 /* called only once, as opposed to dispose */
512 static void
513 gst_queue2_finalize (GObject * object)
514 {
515   GstQueue2 *queue = GST_QUEUE2 (object);
516
517   GST_DEBUG_OBJECT (queue, "finalizing queue");
518
519   while (!g_queue_is_empty (&queue->queue)) {
520     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
521
522     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
523       gst_mini_object_unref (qitem->item);
524     g_slice_free (GstQueue2Item, qitem);
525   }
526
527   queue->last_query = FALSE;
528   g_queue_clear (&queue->queue);
529   g_mutex_clear (&queue->qlock);
530   g_mutex_clear (&queue->buffering_post_lock);
531   g_cond_clear (&queue->item_add);
532   g_cond_clear (&queue->item_del);
533   g_cond_clear (&queue->query_handled);
534   g_timer_destroy (queue->in_timer);
535   g_timer_destroy (queue->out_timer);
536
537   /* temp_file path cleanup  */
538   g_free (queue->temp_template);
539   g_free (queue->temp_location);
540
541   G_OBJECT_CLASS (parent_class)->finalize (object);
542 }
543
544 static void
545 debug_ranges (GstQueue2 * queue)
546 {
547   GstQueue2Range *walk;
548
549   for (walk = queue->ranges; walk; walk = walk->next) {
550     GST_DEBUG_OBJECT (queue,
551         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
552         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
553         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
554         walk->rb_writing_pos, walk->reading_pos,
555         walk == queue->current ? "**y**" : "  n  ");
556   }
557 }
558
559 /* clear all the downloaded ranges */
560 static void
561 clean_ranges (GstQueue2 * queue)
562 {
563   GST_DEBUG_OBJECT (queue, "clean queue ranges");
564
565   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
566   queue->ranges = NULL;
567   queue->current = NULL;
568 }
569
570 /* find a range that contains @offset or NULL when nothing does */
571 static GstQueue2Range *
572 find_range (GstQueue2 * queue, guint64 offset)
573 {
574   GstQueue2Range *range = NULL;
575   GstQueue2Range *walk;
576
577   /* first do a quick check for the current range */
578   for (walk = queue->ranges; walk; walk = walk->next) {
579     if (offset >= walk->offset && offset <= walk->writing_pos) {
580       /* we can reuse an existing range */
581       range = walk;
582       break;
583     }
584   }
585   if (range) {
586     GST_DEBUG_OBJECT (queue,
587         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
588         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
589   } else {
590     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
591   }
592   return range;
593 }
594
595 static void
596 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
597 {
598   guint64 max_reading_pos, writing_pos;
599
600   writing_pos = range->writing_pos;
601   max_reading_pos = range->max_reading_pos;
602
603   if (writing_pos > max_reading_pos)
604     queue->cur_level.bytes = writing_pos - max_reading_pos;
605   else
606     queue->cur_level.bytes = 0;
607 }
608
609 /* make a new range for @offset or reuse an existing range */
610 static GstQueue2Range *
611 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
612 {
613   GstQueue2Range *range, *prev, *next;
614
615   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
616
617   if ((range = find_range (queue, offset))) {
618     GST_DEBUG_OBJECT (queue,
619         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
620         range->writing_pos);
621     if (update_existing && range->writing_pos != offset) {
622       GST_DEBUG_OBJECT (queue, "updating range writing position to "
623           "%" G_GUINT64_FORMAT, offset);
624       range->writing_pos = offset;
625     }
626   } else {
627     GST_DEBUG_OBJECT (queue,
628         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
629
630     range = g_slice_new0 (GstQueue2Range);
631     range->offset = offset;
632     /* we want to write to the next location in the ring buffer */
633     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
634     range->writing_pos = offset;
635     range->rb_writing_pos = range->rb_offset;
636     range->reading_pos = offset;
637     range->max_reading_pos = offset;
638
639     /* insert sorted */
640     prev = NULL;
641     next = queue->ranges;
642     while (next) {
643       if (next->offset > offset) {
644         /* insert before next */
645         GST_DEBUG_OBJECT (queue,
646             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
647             next->offset);
648         break;
649       }
650       /* try next */
651       prev = next;
652       next = next->next;
653     }
654     range->next = next;
655     if (prev)
656       prev->next = range;
657     else
658       queue->ranges = range;
659   }
660   debug_ranges (queue);
661
662   /* update the stats for this range */
663   update_cur_level (queue, range);
664
665   return range;
666 }
667
668
669 /* clear and init the download ranges for offset 0 */
670 static void
671 init_ranges (GstQueue2 * queue)
672 {
673   GST_DEBUG_OBJECT (queue, "init queue ranges");
674
675   /* get rid of all the current ranges */
676   clean_ranges (queue);
677   /* make a range for offset 0 */
678   queue->current = add_range (queue, 0, TRUE);
679 }
680
681 /* calculate the diff between running time on the sink and src of the queue.
682  * This is the total amount of time in the queue. */
683 static void
684 update_time_level (GstQueue2 * queue)
685 {
686   if (queue->sink_tainted) {
687     queue->sinktime =
688         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
689         queue->sink_segment.position);
690     queue->sink_tainted = FALSE;
691   }
692
693   if (queue->src_tainted) {
694     queue->srctime =
695         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
696         queue->src_segment.position);
697     queue->src_tainted = FALSE;
698   }
699
700   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
701       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
702
703   if (queue->sinktime != GST_CLOCK_TIME_NONE
704       && queue->srctime != GST_CLOCK_TIME_NONE
705       && queue->sinktime >= queue->srctime)
706     queue->cur_level.time = queue->sinktime - queue->srctime;
707   else
708     queue->cur_level.time = 0;
709 }
710
711 /* take a SEGMENT event and apply the values to segment, updating the time
712  * level of queue. */
713 static void
714 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
715     gboolean is_sink)
716 {
717   gst_event_copy_segment (event, segment);
718
719   if (segment->format == GST_FORMAT_BYTES) {
720     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
721       /* start is where we'll be getting from and as such writing next */
722       queue->current = add_range (queue, segment->start, TRUE);
723     }
724   }
725
726   /* now configure the values, we use these to track timestamps on the
727    * sinkpad. */
728   if (segment->format != GST_FORMAT_TIME) {
729     /* non-time format, pretent the current time segment is closed with a
730      * 0 start and unknown stop time. */
731     segment->format = GST_FORMAT_TIME;
732     segment->start = 0;
733     segment->stop = -1;
734     segment->time = 0;
735   }
736
737   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
738
739   if (is_sink)
740     queue->sink_tainted = TRUE;
741   else
742     queue->src_tainted = TRUE;
743
744   /* segment can update the time level of the queue */
745   update_time_level (queue);
746 }
747
748 static void
749 apply_gap (GstQueue2 * queue, GstEvent * event,
750     GstSegment * segment, gboolean is_sink)
751 {
752   GstClockTime timestamp;
753   GstClockTime duration;
754
755   gst_event_parse_gap (event, &timestamp, &duration);
756
757   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
758
759     if (GST_CLOCK_TIME_IS_VALID (duration)) {
760       timestamp += duration;
761     }
762
763     segment->position = timestamp;
764
765     if (is_sink)
766       queue->sink_tainted = TRUE;
767     else
768       queue->src_tainted = TRUE;
769
770     /* calc diff with other end */
771     update_time_level (queue);
772   }
773 }
774
775 /* take a buffer and update segment, updating the time level of the queue. */
776 static void
777 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
778     gboolean is_sink)
779 {
780   GstClockTime duration, timestamp;
781
782   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
783   duration = GST_BUFFER_DURATION (buffer);
784
785   /* if no timestamp is set, assume it's continuous with the previous
786    * time */
787   if (timestamp == GST_CLOCK_TIME_NONE)
788     timestamp = segment->position;
789
790   /* add duration */
791   if (duration != GST_CLOCK_TIME_NONE)
792     timestamp += duration;
793
794   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
795       GST_TIME_ARGS (timestamp));
796
797   segment->position = timestamp;
798
799   if (is_sink)
800     queue->sink_tainted = TRUE;
801   else
802     queue->src_tainted = TRUE;
803
804   /* calc diff with other end */
805   update_time_level (queue);
806 }
807
808 static gboolean
809 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
810 {
811   GstClockTime *timestamp = data;
812   GstClockTime btime;
813
814   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
815       " duration %" GST_TIME_FORMAT, idx,
816       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
817       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
818       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
819
820   btime = GST_BUFFER_DTS_OR_PTS (*buf);
821   if (GST_CLOCK_TIME_IS_VALID (btime))
822     *timestamp = btime;
823
824   if (GST_BUFFER_DURATION_IS_VALID (*buf))
825     *timestamp += GST_BUFFER_DURATION (*buf);
826
827   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
828   return TRUE;
829 }
830
831 /* take a buffer list and update segment, updating the time level of the queue */
832 static void
833 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
834     GstSegment * segment, gboolean is_sink)
835 {
836   GstClockTime timestamp;
837
838   /* if no timestamp is set, assume it's continuous with the previous time */
839   timestamp = segment->position;
840
841   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
842
843   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
844       GST_TIME_ARGS (timestamp));
845
846   segment->position = timestamp;
847
848   if (is_sink)
849     queue->sink_tainted = TRUE;
850   else
851     queue->src_tainted = TRUE;
852
853   /* calc diff with other end */
854   update_time_level (queue);
855 }
856
857 static gboolean
858 get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
859     gint * percent)
860 {
861   gint perc;
862
863   if (queue->high_percent <= 0) {
864     if (percent)
865       *percent = 100;
866     if (is_buffering)
867       *is_buffering = FALSE;
868     return FALSE;
869   }
870 #define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
871
872   if (queue->is_eos) {
873     /* on EOS we are always 100% full, we set the var here so that it we can
874      * reuse the logic below to stop buffering */
875     perc = 100;
876     GST_LOG_OBJECT (queue, "we are EOS");
877   } else {
878     /* figure out the percent we are filled, we take the max of all formats. */
879     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
880       perc = GET_PERCENT (bytes, 0);
881     } else {
882       guint64 rb_size = queue->ring_buffer_max_size;
883       perc = GET_PERCENT (bytes, rb_size);
884     }
885     perc = MAX (perc, GET_PERCENT (time, 0));
886     perc = MAX (perc, GET_PERCENT (buffers, 0));
887
888     /* also apply the rate estimate when we need to */
889     if (queue->use_rate_estimate)
890       perc = MAX (perc, GET_PERCENT (rate_time, 0));
891   }
892 #undef GET_PERCENT
893
894   if (is_buffering)
895     *is_buffering = queue->is_buffering;
896
897   /* scale to high percent so that it becomes the 100% mark */
898   perc = perc * 100 / queue->high_percent;
899   /* clip */
900   if (perc > 100)
901     perc = 100;
902
903   if (percent)
904     *percent = perc;
905
906   GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
907       perc);
908
909   return TRUE;
910 }
911
912 static void
913 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
914     gint * avg_in, gint * avg_out, gint64 * buffering_left)
915 {
916   if (mode) {
917     if (!QUEUE_IS_USING_QUEUE (queue)) {
918       if (QUEUE_IS_USING_RING_BUFFER (queue))
919         *mode = GST_BUFFERING_TIMESHIFT;
920       else
921         *mode = GST_BUFFERING_DOWNLOAD;
922     } else {
923       *mode = GST_BUFFERING_STREAM;
924     }
925   }
926
927   if (avg_in)
928     *avg_in = queue->byte_in_rate;
929   if (avg_out)
930     *avg_out = queue->byte_out_rate;
931
932   if (buffering_left) {
933     *buffering_left = (percent == 100 ? 0 : -1);
934
935     if (queue->use_rate_estimate) {
936       guint64 max, cur;
937
938       max = queue->max_level.rate_time;
939       cur = queue->cur_level.rate_time;
940
941       if (percent != 100 && max > cur)
942         *buffering_left = (max - cur) / 1000000;
943     }
944   }
945 }
946
947 static void
948 gst_queue2_post_buffering (GstQueue2 * queue)
949 {
950   GstMessage *msg = NULL;
951
952   g_mutex_lock (&queue->buffering_post_lock);
953   GST_QUEUE2_MUTEX_LOCK (queue);
954   if (queue->percent_changed) {
955     gint percent = queue->buffering_percent;
956
957     queue->percent_changed = FALSE;
958
959     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
960     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
961
962     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
963         queue->avg_out, queue->buffering_left);
964   }
965   GST_QUEUE2_MUTEX_UNLOCK (queue);
966
967   if (msg != NULL)
968     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
969
970   g_mutex_unlock (&queue->buffering_post_lock);
971 }
972
973 static void
974 update_buffering (GstQueue2 * queue)
975 {
976   gint percent;
977
978   /* Ensure the variables used to calculate buffering state are up-to-date. */
979   if (queue->current)
980     update_cur_level (queue, queue->current);
981   update_in_rates (queue);
982
983   if (!get_buffering_percent (queue, NULL, &percent))
984     return;
985
986   if (queue->is_buffering) {
987     /* if we were buffering see if we reached the high watermark */
988     if (percent >= 100)
989       queue->is_buffering = FALSE;
990
991     SET_PERCENT (queue, percent);
992   } else {
993     /* we were not buffering, check if we need to start buffering if we drop
994      * below the low threshold */
995     if (percent < queue->low_percent) {
996       queue->is_buffering = TRUE;
997       SET_PERCENT (queue, percent);
998     }
999   }
1000 }
1001
1002 static void
1003 reset_rate_timer (GstQueue2 * queue)
1004 {
1005   queue->bytes_in = 0;
1006   queue->bytes_out = 0;
1007   queue->byte_in_rate = 0.0;
1008   queue->byte_in_period = 0;
1009   queue->byte_out_rate = 0.0;
1010   queue->last_update_in_rates_elapsed = 0.0;
1011   queue->last_in_elapsed = 0.0;
1012   queue->last_out_elapsed = 0.0;
1013   queue->in_timer_started = FALSE;
1014   queue->out_timer_started = FALSE;
1015 }
1016
1017 /* the interval in seconds to recalculate the rate */
1018 #define RATE_INTERVAL    0.2
1019 /* Tuning for rate estimation. We use a large window for the input rate because
1020  * it should be stable when connected to a network. The output rate is less
1021  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1022  * therefore adapt more quickly.
1023  * However, initial input rate may be subject to a burst, and should therefore
1024  * initially also adapt more quickly to changes, and only later on give higher
1025  * weight to previous values. */
1026 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1027 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1028
1029 static void
1030 update_in_rates (GstQueue2 * queue)
1031 {
1032   gdouble elapsed, period;
1033   gdouble byte_in_rate;
1034
1035   if (!queue->in_timer_started) {
1036     queue->in_timer_started = TRUE;
1037     g_timer_start (queue->in_timer);
1038     return;
1039   }
1040
1041   queue->last_update_in_rates_elapsed = elapsed =
1042       g_timer_elapsed (queue->in_timer, NULL);
1043
1044   /* recalc after each interval. */
1045   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1046     period = elapsed - queue->last_in_elapsed;
1047
1048     GST_DEBUG_OBJECT (queue,
1049         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1050         period, queue->bytes_in, queue->byte_in_period);
1051
1052     byte_in_rate = queue->bytes_in / period;
1053
1054     if (queue->byte_in_rate == 0.0)
1055       queue->byte_in_rate = byte_in_rate;
1056     else
1057       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1058           (double) queue->byte_in_period, period);
1059
1060     /* another data point, cap at 16 for long time running average */
1061     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1062       queue->byte_in_period += period;
1063
1064     /* reset the values to calculate rate over the next interval */
1065     queue->last_in_elapsed = elapsed;
1066     queue->bytes_in = 0;
1067   }
1068
1069   if (queue->byte_in_rate > 0.0) {
1070     queue->cur_level.rate_time =
1071         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1072   }
1073   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1074       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1075 }
1076
1077 static void
1078 update_out_rates (GstQueue2 * queue)
1079 {
1080   gdouble elapsed, period;
1081   gdouble byte_out_rate;
1082
1083   if (!queue->out_timer_started) {
1084     queue->out_timer_started = TRUE;
1085     g_timer_start (queue->out_timer);
1086     return;
1087   }
1088
1089   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1090
1091   /* recalc after each interval. */
1092   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1093     period = elapsed - queue->last_out_elapsed;
1094
1095     GST_DEBUG_OBJECT (queue,
1096         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1097
1098     byte_out_rate = queue->bytes_out / period;
1099
1100     if (queue->byte_out_rate == 0.0)
1101       queue->byte_out_rate = byte_out_rate;
1102     else
1103       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1104
1105     /* reset the values to calculate rate over the next interval */
1106     queue->last_out_elapsed = elapsed;
1107     queue->bytes_out = 0;
1108   }
1109   if (queue->byte_in_rate > 0.0) {
1110     queue->cur_level.rate_time =
1111         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1112   }
1113   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1114       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1115 }
1116
1117 static void
1118 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1119 {
1120   guint64 reading_pos, max_reading_pos;
1121
1122   reading_pos = pos;
1123   max_reading_pos = range->max_reading_pos;
1124
1125   max_reading_pos = MAX (max_reading_pos, reading_pos);
1126
1127   GST_DEBUG_OBJECT (queue,
1128       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1129       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1130   range->max_reading_pos = max_reading_pos;
1131
1132   update_cur_level (queue, range);
1133 }
1134
1135 static gboolean
1136 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1137 {
1138   GstEvent *event;
1139   gboolean res;
1140
1141   /* until we receive the FLUSH_STOP from this seek, we skip data */
1142   queue->seeking = TRUE;
1143   GST_QUEUE2_MUTEX_UNLOCK (queue);
1144
1145   debug_ranges (queue);
1146
1147   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1148
1149   event =
1150       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1151       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1152       GST_SEEK_TYPE_NONE, -1);
1153
1154   res = gst_pad_push_event (queue->sinkpad, event);
1155   GST_QUEUE2_MUTEX_LOCK (queue);
1156
1157   if (res) {
1158     /* Between us sending the seek event and re-acquiring the lock, the source
1159      * thread might already have pushed data and moved along the range's
1160      * writing_pos beyond the seek offset. In that case we don't want to set
1161      * the writing position back to the requested seek position, as it would
1162      * cause data to be written to the wrong offset in the file or ring buffer.
1163      * We still do the add_range call to switch the current range to the
1164      * requested range, or create one if one doesn't exist yet. */
1165     queue->current = add_range (queue, offset, FALSE);
1166   }
1167
1168   return res;
1169 }
1170
1171 /* get the threshold for when we decide to seek rather than wait */
1172 static guint64
1173 get_seek_threshold (GstQueue2 * queue)
1174 {
1175   guint64 threshold;
1176
1177   /* FIXME, find a good threshold based on the incoming rate. */
1178   threshold = 1024 * 512;
1179
1180   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1181     threshold = MIN (threshold,
1182         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1183   }
1184   return threshold;
1185 }
1186
1187 /* see if there is enough data in the file to read a full buffer */
1188 static gboolean
1189 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1190 {
1191   GstQueue2Range *range;
1192
1193   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1194       offset, length);
1195
1196   if ((range = find_range (queue, offset))) {
1197     if (queue->current != range) {
1198       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1199       perform_seek_to_offset (queue, range->writing_pos);
1200     }
1201
1202     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1203         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1204
1205     /* we have a range for offset */
1206     GST_DEBUG_OBJECT (queue,
1207         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1208         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1209
1210     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1211       return TRUE;
1212
1213     if (offset + length <= range->writing_pos)
1214       return TRUE;
1215     else
1216       GST_DEBUG_OBJECT (queue,
1217           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1218           (offset + length) - range->writing_pos);
1219
1220   } else {
1221     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1222         " len %u", offset, length);
1223     /* we don't have the range, see how far away we are */
1224     if (!queue->is_eos && queue->current) {
1225       guint64 threshold = get_seek_threshold (queue);
1226
1227       if (offset >= queue->current->offset && offset <=
1228           queue->current->writing_pos + threshold) {
1229         GST_INFO_OBJECT (queue,
1230             "requested data is within range, wait for data");
1231         return FALSE;
1232       }
1233     }
1234
1235     /* too far away, do a seek */
1236     perform_seek_to_offset (queue, offset);
1237   }
1238
1239   return FALSE;
1240 }
1241
1242 #ifdef HAVE_FSEEKO
1243 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1244 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1245 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1246 #else
1247 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1248 #endif
1249
1250 static GstFlowReturn
1251 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1252     guint8 * dst, gint64 * read_return)
1253 {
1254   guint8 *ring_buffer;
1255   size_t res;
1256
1257   ring_buffer = queue->ring_buffer;
1258
1259   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1260     goto seek_failed;
1261
1262   /* this should not block */
1263   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1264       length, offset);
1265   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1266     res = fread (dst, 1, length, queue->temp_file);
1267   } else {
1268     memcpy (dst, ring_buffer + offset, length);
1269     res = length;
1270   }
1271
1272   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1273
1274   if (G_UNLIKELY (res < length)) {
1275     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1276       goto could_not_read;
1277     /* check for errors or EOF */
1278     if (ferror (queue->temp_file))
1279       goto could_not_read;
1280     if (feof (queue->temp_file) && length > 0)
1281       goto eos;
1282   }
1283
1284   *read_return = res;
1285
1286   return GST_FLOW_OK;
1287
1288 seek_failed:
1289   {
1290     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1291     return GST_FLOW_ERROR;
1292   }
1293 could_not_read:
1294   {
1295     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1296     return GST_FLOW_ERROR;
1297   }
1298 eos:
1299   {
1300     GST_DEBUG ("non-regular file hits EOS");
1301     return GST_FLOW_EOS;
1302   }
1303 }
1304
1305 static GstFlowReturn
1306 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1307     GstBuffer ** buffer)
1308 {
1309   GstBuffer *buf;
1310   GstMapInfo info;
1311   guint8 *data;
1312   guint64 file_offset;
1313   guint block_length, remaining, read_length;
1314   guint64 rb_size;
1315   guint64 max_size;
1316   guint64 rpos;
1317   GstFlowReturn ret = GST_FLOW_OK;
1318
1319   /* allocate the output buffer of the requested size */
1320   if (*buffer == NULL)
1321     buf = gst_buffer_new_allocate (NULL, length, NULL);
1322   else
1323     buf = *buffer;
1324
1325   gst_buffer_map (buf, &info, GST_MAP_WRITE);
1326   data = info.data;
1327
1328   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1329       offset);
1330
1331   rpos = offset;
1332   rb_size = queue->ring_buffer_max_size;
1333   max_size = QUEUE_MAX_BYTES (queue);
1334
1335   remaining = length;
1336   while (remaining > 0) {
1337     /* configure how much/whether to read */
1338     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1339       read_length = 0;
1340
1341       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1342         guint64 level;
1343
1344         /* calculate how far away the offset is */
1345         if (queue->current->writing_pos > rpos)
1346           level = queue->current->writing_pos - rpos;
1347         else
1348           level = 0;
1349
1350         GST_DEBUG_OBJECT (queue,
1351             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1352             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1353             rpos, queue->current->writing_pos, level, max_size);
1354
1355         if (level >= max_size) {
1356           /* we don't have the data but if we have a ring buffer that is full, we
1357            * need to read */
1358           GST_DEBUG_OBJECT (queue,
1359               "ring buffer full, reading QUEUE_MAX_BYTES %"
1360               G_GUINT64_FORMAT " bytes", max_size);
1361           read_length = max_size;
1362         } else if (queue->is_eos) {
1363           /* won't get any more data so read any data we have */
1364           if (level) {
1365             GST_DEBUG_OBJECT (queue,
1366                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1367                 level);
1368             read_length = level;
1369             remaining = level;
1370             length = level;
1371           } else
1372             goto hit_eos;
1373         }
1374       }
1375
1376       if (read_length == 0) {
1377         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1378           GST_DEBUG_OBJECT (queue,
1379               "update current position [%" G_GUINT64_FORMAT "-%"
1380               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1381           update_cur_pos (queue, queue->current, rpos);
1382           GST_QUEUE2_SIGNAL_DEL (queue);
1383         }
1384
1385         if (queue->use_buffering)
1386           update_buffering (queue);
1387
1388         GST_DEBUG_OBJECT (queue, "waiting for add");
1389         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1390         continue;
1391       }
1392     } else {
1393       /* we have the requested data so read it */
1394       read_length = remaining;
1395     }
1396
1397     /* set range reading_pos to actual reading position for this read */
1398     queue->current->reading_pos = rpos;
1399
1400     /* configure how much and from where to read */
1401     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1402       file_offset =
1403           (queue->current->rb_offset + (rpos -
1404               queue->current->offset)) % rb_size;
1405       if (file_offset + read_length > rb_size) {
1406         block_length = rb_size - file_offset;
1407       } else {
1408         block_length = read_length;
1409       }
1410     } else {
1411       file_offset = rpos;
1412       block_length = read_length;
1413     }
1414
1415     /* while we still have data to read, we loop */
1416     while (read_length > 0) {
1417       gint64 read_return;
1418
1419       ret =
1420           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1421           data, &read_return);
1422       if (ret != GST_FLOW_OK)
1423         goto read_error;
1424
1425       file_offset += read_return;
1426       if (QUEUE_IS_USING_RING_BUFFER (queue))
1427         file_offset %= rb_size;
1428
1429       data += read_return;
1430       read_length -= read_return;
1431       block_length = read_length;
1432       remaining -= read_return;
1433
1434       rpos = (queue->current->reading_pos += read_return);
1435       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1436     }
1437     GST_QUEUE2_SIGNAL_DEL (queue);
1438     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1439   }
1440
1441   gst_buffer_unmap (buf, &info);
1442   gst_buffer_resize (buf, 0, length);
1443
1444   GST_BUFFER_OFFSET (buf) = offset;
1445   GST_BUFFER_OFFSET_END (buf) = offset + length;
1446
1447   *buffer = buf;
1448
1449   return ret;
1450
1451   /* ERRORS */
1452 hit_eos:
1453   {
1454     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1455     gst_buffer_unmap (buf, &info);
1456     if (*buffer == NULL)
1457       gst_buffer_unref (buf);
1458     return GST_FLOW_EOS;
1459   }
1460 out_flushing:
1461   {
1462     GST_DEBUG_OBJECT (queue, "we are flushing");
1463     gst_buffer_unmap (buf, &info);
1464     if (*buffer == NULL)
1465       gst_buffer_unref (buf);
1466     return GST_FLOW_FLUSHING;
1467   }
1468 read_error:
1469   {
1470     GST_DEBUG_OBJECT (queue, "we have a read error");
1471     gst_buffer_unmap (buf, &info);
1472     if (*buffer == NULL)
1473       gst_buffer_unref (buf);
1474     return ret;
1475   }
1476 }
1477
1478 /* should be called with QUEUE_LOCK */
1479 static GstMiniObject *
1480 gst_queue2_read_item_from_file (GstQueue2 * queue)
1481 {
1482   GstMiniObject *item;
1483
1484   if (queue->stream_start_event != NULL) {
1485     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1486     queue->stream_start_event = NULL;
1487   } else if (queue->starting_segment != NULL) {
1488     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1489     queue->starting_segment = NULL;
1490   } else {
1491     GstFlowReturn ret;
1492     GstBuffer *buffer = NULL;
1493     guint64 reading_pos;
1494
1495     reading_pos = queue->current->reading_pos;
1496
1497     ret =
1498         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1499         &buffer);
1500
1501     switch (ret) {
1502       case GST_FLOW_OK:
1503         item = GST_MINI_OBJECT_CAST (buffer);
1504         break;
1505       case GST_FLOW_EOS:
1506         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1507         break;
1508       default:
1509         item = NULL;
1510         break;
1511     }
1512   }
1513   return item;
1514 }
1515
1516 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1517  * the temp filename. */
1518 static gboolean
1519 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1520 {
1521   gint fd = -1;
1522   gchar *name = NULL;
1523
1524   if (queue->temp_file)
1525     goto already_opened;
1526
1527   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1528
1529   /* If temp_template was set, allocate a filename and open that filen */
1530
1531   /* nothing to do */
1532   if (queue->temp_template == NULL)
1533     goto no_directory;
1534
1535   /* make copy of the template, we don't want to change this */
1536   name = g_strdup (queue->temp_template);
1537   fd = g_mkstemp (name);
1538   if (fd == -1)
1539     goto mkstemp_failed;
1540
1541   /* open the file for update/writing */
1542   queue->temp_file = fdopen (fd, "wb+");
1543   /* error creating file */
1544   if (queue->temp_file == NULL)
1545     goto open_failed;
1546
1547   g_free (queue->temp_location);
1548   queue->temp_location = name;
1549
1550   GST_QUEUE2_MUTEX_UNLOCK (queue);
1551
1552   /* we can't emit the notify with the lock */
1553   g_object_notify (G_OBJECT (queue), "temp-location");
1554
1555   GST_QUEUE2_MUTEX_LOCK (queue);
1556
1557   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1558
1559   return TRUE;
1560
1561   /* ERRORS */
1562 already_opened:
1563   {
1564     GST_DEBUG_OBJECT (queue, "temp file was already open");
1565     return TRUE;
1566   }
1567 no_directory:
1568   {
1569     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1570         (_("No Temp directory specified.")), (NULL));
1571     return FALSE;
1572   }
1573 mkstemp_failed:
1574   {
1575     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1576         (_("Could not create temp file \"%s\"."), queue->temp_template),
1577         GST_ERROR_SYSTEM);
1578     g_free (name);
1579     return FALSE;
1580   }
1581 open_failed:
1582   {
1583     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1584         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1585     g_free (name);
1586     if (fd != -1)
1587       close (fd);
1588     return FALSE;
1589   }
1590 }
1591
1592 static void
1593 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1594 {
1595   /* nothing to do */
1596   if (queue->temp_file == NULL)
1597     return;
1598
1599   GST_DEBUG_OBJECT (queue, "closing temp file");
1600
1601   fflush (queue->temp_file);
1602   fclose (queue->temp_file);
1603
1604   if (queue->temp_remove) {
1605     if (remove (queue->temp_location) < 0) {
1606       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1607           queue->temp_location, g_strerror (errno));
1608     }
1609   }
1610
1611   queue->temp_file = NULL;
1612   clean_ranges (queue);
1613 }
1614
1615 static void
1616 gst_queue2_flush_temp_file (GstQueue2 * queue)
1617 {
1618   if (queue->temp_file == NULL)
1619     return;
1620
1621   GST_DEBUG_OBJECT (queue, "flushing temp file");
1622
1623   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1624 }
1625
1626 static void
1627 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1628 {
1629   if (!QUEUE_IS_USING_QUEUE (queue)) {
1630     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1631       gst_queue2_flush_temp_file (queue);
1632     init_ranges (queue);
1633   } else {
1634     while (!g_queue_is_empty (&queue->queue)) {
1635       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1636
1637       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1638           && GST_EVENT_IS_STICKY (qitem->item)
1639           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1640           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1641         gst_pad_store_sticky_event (queue->srcpad,
1642             GST_EVENT_CAST (qitem->item));
1643       }
1644
1645       /* Then lose another reference because we are supposed to destroy that
1646          data when flushing */
1647       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1648         gst_mini_object_unref (qitem->item);
1649       g_slice_free (GstQueue2Item, qitem);
1650     }
1651   }
1652   queue->last_query = FALSE;
1653   g_cond_signal (&queue->query_handled);
1654   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1655   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1656   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1657   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1658   queue->sink_tainted = queue->src_tainted = TRUE;
1659   if (queue->starting_segment != NULL)
1660     gst_event_unref (queue->starting_segment);
1661   queue->starting_segment = NULL;
1662   queue->segment_event_received = FALSE;
1663   gst_event_replace (&queue->stream_start_event, NULL);
1664
1665   /* we deleted a lot of something */
1666   GST_QUEUE2_SIGNAL_DEL (queue);
1667 }
1668
1669 static gboolean
1670 gst_queue2_wait_free_space (GstQueue2 * queue)
1671 {
1672   /* We make space available if we're "full" according to whatever
1673    * the user defined as "full". */
1674   if (gst_queue2_is_filled (queue)) {
1675     gboolean started;
1676
1677     /* pause the timer while we wait. The fact that we are waiting does not mean
1678      * the byterate on the input pad is lower */
1679     if ((started = queue->in_timer_started))
1680       g_timer_stop (queue->in_timer);
1681
1682     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1683         "queue is full, waiting for free space");
1684     do {
1685       GST_QUEUE2_MUTEX_UNLOCK (queue);
1686       g_signal_emit (queue, gst_queue2_signals[SIGNAL_OVERRUN], 0);
1687       GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
1688       /* we recheck, the signal could have changed the thresholds */
1689       if (!gst_queue2_is_filled (queue))
1690         break;
1691
1692       /* Wait for space to be available, we could be unlocked because of a flush. */
1693       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1694     }
1695     while (gst_queue2_is_filled (queue));
1696
1697     /* and continue if we were running before */
1698     if (started)
1699       g_timer_continue (queue->in_timer);
1700   }
1701   return TRUE;
1702
1703   /* ERRORS */
1704 out_flushing:
1705   {
1706     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1707     return FALSE;
1708   }
1709 }
1710
1711 static gboolean
1712 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1713 {
1714   GstMapInfo info;
1715   guint8 *data, *ring_buffer;
1716   guint size, rb_size;
1717   guint64 writing_pos, new_writing_pos;
1718   GstQueue2Range *range, *prev, *next;
1719   gboolean do_seek = FALSE;
1720
1721   if (QUEUE_IS_USING_RING_BUFFER (queue))
1722     writing_pos = queue->current->rb_writing_pos;
1723   else
1724     writing_pos = queue->current->writing_pos;
1725   ring_buffer = queue->ring_buffer;
1726   rb_size = queue->ring_buffer_max_size;
1727
1728   gst_buffer_map (buffer, &info, GST_MAP_READ);
1729
1730   size = info.size;
1731   data = info.data;
1732
1733   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1734       writing_pos);
1735
1736   /* sanity check */
1737   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1738       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1739     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1740         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1741         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1742   }
1743
1744   while (size > 0) {
1745     guint to_write;
1746
1747     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1748       gint64 space;
1749
1750       /* calculate the space in the ring buffer not used by data from
1751        * the current range */
1752       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1753         /* wait until there is some free space */
1754         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1755       }
1756       /* get the amount of space we have */
1757       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1758
1759       /* calculate if we need to split or if we can write the entire
1760        * buffer now */
1761       to_write = MIN (size, space);
1762
1763       /* the writing position in the ring buffer after writing (part
1764        * or all of) the buffer */
1765       new_writing_pos = (writing_pos + to_write) % rb_size;
1766
1767       prev = NULL;
1768       range = queue->ranges;
1769
1770       /* if we need to overwrite data in the ring buffer, we need to
1771        * update the ranges
1772        *
1773        * warning: this code is complicated and includes some
1774        * simplifications - pen, paper and diagrams for the cases
1775        * recommended! */
1776       while (range) {
1777         guint64 range_data_start, range_data_end;
1778         GstQueue2Range *range_to_destroy = NULL;
1779
1780         range_data_start = range->rb_offset;
1781         range_data_end = range->rb_writing_pos;
1782
1783         /* handle the special case where the range has no data in it */
1784         if (range->writing_pos == range->offset) {
1785           if (range != queue->current) {
1786             GST_DEBUG_OBJECT (queue,
1787                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1788                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1789             /* remove range */
1790             range_to_destroy = range;
1791             if (prev)
1792               prev->next = range->next;
1793           }
1794           goto next_range;
1795         }
1796
1797         if (range_data_end > range_data_start) {
1798           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1799             goto next_range;
1800
1801           if (new_writing_pos > range_data_start) {
1802             if (new_writing_pos >= range_data_end) {
1803               GST_DEBUG_OBJECT (queue,
1804                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1805                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1806               /* remove range */
1807               range_to_destroy = range;
1808               if (prev)
1809                 prev->next = range->next;
1810             } else {
1811               GST_DEBUG_OBJECT (queue,
1812                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1813                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1814                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1815                   range->offset + new_writing_pos - range_data_start,
1816                   new_writing_pos);
1817               range->offset += (new_writing_pos - range_data_start);
1818               range->rb_offset = new_writing_pos;
1819             }
1820           }
1821         } else {
1822           guint64 new_wpos_virt = writing_pos + to_write;
1823
1824           if (new_wpos_virt <= range_data_start)
1825             goto next_range;
1826
1827           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1828             GST_DEBUG_OBJECT (queue,
1829                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1830                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1831             /* remove range */
1832             range_to_destroy = range;
1833             if (prev)
1834               prev->next = range->next;
1835           } else {
1836             GST_DEBUG_OBJECT (queue,
1837                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1838                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1839                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1840                 range->offset + new_writing_pos - range_data_start,
1841                 new_writing_pos);
1842             range->offset += (new_wpos_virt - range_data_start);
1843             range->rb_offset = new_writing_pos;
1844           }
1845         }
1846
1847       next_range:
1848         if (!range_to_destroy)
1849           prev = range;
1850
1851         range = range->next;
1852         if (range_to_destroy) {
1853           if (range_to_destroy == queue->ranges)
1854             queue->ranges = range;
1855           g_slice_free (GstQueue2Range, range_to_destroy);
1856           range_to_destroy = NULL;
1857         }
1858       }
1859     } else {
1860       to_write = size;
1861       new_writing_pos = writing_pos + to_write;
1862     }
1863
1864     if (QUEUE_IS_USING_TEMP_FILE (queue)
1865         && FSEEK_FILE (queue->temp_file, writing_pos))
1866       goto seek_failed;
1867
1868     if (new_writing_pos > writing_pos) {
1869       GST_INFO_OBJECT (queue,
1870           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1871           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1872           queue->current->writing_pos, queue->current->rb_writing_pos);
1873       /* either not using ring buffer or no wrapping, just write */
1874       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1875         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1876           goto handle_error;
1877       } else {
1878         memcpy (ring_buffer + writing_pos, data, to_write);
1879       }
1880
1881       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1882         /* try to merge with next range */
1883         while ((next = queue->current->next)) {
1884           GST_INFO_OBJECT (queue,
1885               "checking merge with next range %" G_GUINT64_FORMAT " < %"
1886               G_GUINT64_FORMAT, new_writing_pos, next->offset);
1887           if (new_writing_pos < next->offset)
1888             break;
1889
1890           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1891               next->writing_pos);
1892
1893           /* remove the group */
1894           queue->current->next = next->next;
1895
1896           /* We use the threshold to decide if we want to do a seek or simply
1897            * read the data again. If there is not so much data in the range we
1898            * prefer to avoid to seek and read it again. */
1899           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
1900             /* the new range had more data than the threshold, it's worth keeping
1901              * it and doing a seek. */
1902             new_writing_pos = next->writing_pos;
1903             do_seek = TRUE;
1904           }
1905           g_slice_free (GstQueue2Range, next);
1906         }
1907         goto update_and_signal;
1908       }
1909     } else {
1910       /* wrapping */
1911       guint block_one, block_two;
1912
1913       block_one = rb_size - writing_pos;
1914       block_two = to_write - block_one;
1915
1916       if (block_one > 0) {
1917         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
1918         /* write data to end of ring buffer */
1919         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1920           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1921             goto handle_error;
1922         } else {
1923           memcpy (ring_buffer + writing_pos, data, block_one);
1924         }
1925       }
1926
1927       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
1928         goto seek_failed;
1929
1930       if (block_two > 0) {
1931         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
1932         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1933           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
1934             goto handle_error;
1935         } else {
1936           memcpy (ring_buffer, data + block_one, block_two);
1937         }
1938       }
1939     }
1940
1941   update_and_signal:
1942     /* update the writing positions */
1943     size -= to_write;
1944     GST_INFO_OBJECT (queue,
1945         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
1946         to_write, writing_pos, size);
1947
1948     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1949       data += to_write;
1950       queue->current->writing_pos += to_write;
1951       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1952     } else {
1953       queue->current->writing_pos = writing_pos = new_writing_pos;
1954     }
1955     if (do_seek)
1956       perform_seek_to_offset (queue, new_writing_pos);
1957
1958     update_cur_level (queue, queue->current);
1959
1960     /* update the buffering status */
1961     if (queue->use_buffering)
1962       update_buffering (queue);
1963
1964     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1965         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1966
1967     GST_QUEUE2_SIGNAL_ADD (queue);
1968   }
1969
1970   gst_buffer_unmap (buffer, &info);
1971
1972   return TRUE;
1973
1974   /* ERRORS */
1975 out_flushing:
1976   {
1977     GST_DEBUG_OBJECT (queue, "we are flushing");
1978     gst_buffer_unmap (buffer, &info);
1979     /* FIXME - GST_FLOW_EOS ? */
1980     return FALSE;
1981   }
1982 seek_failed:
1983   {
1984     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1985     gst_buffer_unmap (buffer, &info);
1986     return FALSE;
1987   }
1988 handle_error:
1989   {
1990     switch (errno) {
1991       case ENOSPC:{
1992         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
1993         break;
1994       }
1995       default:{
1996         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
1997             (_("Error while writing to download file.")),
1998             ("%s", g_strerror (errno)));
1999       }
2000     }
2001     gst_buffer_unmap (buffer, &info);
2002     return FALSE;
2003   }
2004 }
2005
2006 static gboolean
2007 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2008 {
2009   GstQueue2 *queue = q;
2010
2011   GST_TRACE_OBJECT (queue,
2012       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2013       gst_buffer_get_size (*buf));
2014
2015   if (!gst_queue2_create_write (queue, *buf)) {
2016     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2017     return FALSE;
2018   }
2019   return TRUE;
2020 }
2021
2022 static gboolean
2023 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2024 {
2025   guint *p_size = data;
2026   gsize buf_size;
2027
2028   buf_size = gst_buffer_get_size (*buf);
2029   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2030   *p_size += buf_size;
2031   return TRUE;
2032 }
2033
2034 /* enqueue an item an update the level stats */
2035 static void
2036 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2037     GstQueue2ItemType item_type)
2038 {
2039   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2040     GstBuffer *buffer;
2041     guint size;
2042
2043     buffer = GST_BUFFER_CAST (item);
2044     size = gst_buffer_get_size (buffer);
2045
2046     /* add buffer to the statistics */
2047     if (QUEUE_IS_USING_QUEUE (queue)) {
2048       queue->cur_level.buffers++;
2049       queue->cur_level.bytes += size;
2050     }
2051     queue->bytes_in += size;
2052
2053     /* apply new buffer to segment stats */
2054     apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
2055     /* update the byterate stats */
2056     update_in_rates (queue);
2057
2058     if (!QUEUE_IS_USING_QUEUE (queue)) {
2059       /* FIXME - check return value? */
2060       gst_queue2_create_write (queue, buffer);
2061     }
2062   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2063     GstBufferList *buffer_list;
2064     guint size = 0;
2065
2066     buffer_list = GST_BUFFER_LIST_CAST (item);
2067
2068     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2069     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2070
2071     /* add buffer to the statistics */
2072     if (QUEUE_IS_USING_QUEUE (queue)) {
2073       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2074       queue->cur_level.bytes += size;
2075     }
2076     queue->bytes_in += size;
2077
2078     /* apply new buffer to segment stats */
2079     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2080
2081     /* update the byterate stats */
2082     update_in_rates (queue);
2083
2084     if (!QUEUE_IS_USING_QUEUE (queue)) {
2085       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2086     }
2087   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2088     GstEvent *event;
2089
2090     event = GST_EVENT_CAST (item);
2091
2092     switch (GST_EVENT_TYPE (event)) {
2093       case GST_EVENT_EOS:
2094         /* Zero the thresholds, this makes sure the queue is completely
2095          * filled and we can read all data from the queue. */
2096         GST_DEBUG_OBJECT (queue, "we have EOS");
2097         queue->is_eos = TRUE;
2098         break;
2099       case GST_EVENT_SEGMENT:
2100         apply_segment (queue, event, &queue->sink_segment, TRUE);
2101         /* This is our first new segment, we hold it
2102          * as we can't save it on the temp file */
2103         if (!QUEUE_IS_USING_QUEUE (queue)) {
2104           if (queue->segment_event_received)
2105             goto unexpected_event;
2106
2107           queue->segment_event_received = TRUE;
2108           if (queue->starting_segment != NULL)
2109             gst_event_unref (queue->starting_segment);
2110           queue->starting_segment = event;
2111           item = NULL;
2112         }
2113         /* a new segment allows us to accept more buffers if we got EOS
2114          * from downstream */
2115         queue->unexpected = FALSE;
2116         break;
2117       case GST_EVENT_GAP:
2118         apply_gap (queue, event, &queue->sink_segment, TRUE);
2119         break;
2120       case GST_EVENT_STREAM_START:
2121         if (!QUEUE_IS_USING_QUEUE (queue)) {
2122           gst_event_replace (&queue->stream_start_event, event);
2123           gst_event_unref (event);
2124           item = NULL;
2125         }
2126         break;
2127       case GST_EVENT_CAPS:{
2128         GstCaps *caps;
2129
2130         gst_event_parse_caps (event, &caps);
2131         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2132
2133         if (!QUEUE_IS_USING_QUEUE (queue)) {
2134           GST_LOG ("Dropping caps event, not using queue");
2135           gst_event_unref (event);
2136           item = NULL;
2137         }
2138         break;
2139       }
2140       default:
2141         if (!QUEUE_IS_USING_QUEUE (queue))
2142           goto unexpected_event;
2143         break;
2144     }
2145   } else if (GST_IS_QUERY (item)) {
2146     /* Can't happen as we check that in the caller */
2147     if (!QUEUE_IS_USING_QUEUE (queue))
2148       g_assert_not_reached ();
2149   } else {
2150     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2151         item, GST_OBJECT_NAME (queue));
2152     /* we can't really unref since we don't know what it is */
2153     item = NULL;
2154   }
2155
2156   if (item) {
2157     /* update the buffering status */
2158     if (queue->use_buffering)
2159       update_buffering (queue);
2160
2161     if (QUEUE_IS_USING_QUEUE (queue)) {
2162       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2163       qitem->type = item_type;
2164       qitem->item = item;
2165       g_queue_push_tail (&queue->queue, qitem);
2166     } else {
2167       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2168     }
2169
2170     GST_QUEUE2_SIGNAL_ADD (queue);
2171   }
2172
2173   return;
2174
2175   /* ERRORS */
2176 unexpected_event:
2177   {
2178     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2179
2180     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2181         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2182         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2183     gst_event_unref (GST_EVENT_CAST (item));
2184     return;
2185   }
2186 }
2187
2188 /* dequeue an item from the queue and update level stats */
2189 static GstMiniObject *
2190 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2191 {
2192   GstMiniObject *item;
2193
2194   if (!QUEUE_IS_USING_QUEUE (queue)) {
2195     item = gst_queue2_read_item_from_file (queue);
2196   } else {
2197     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2198
2199     if (qitem == NULL)
2200       goto no_item;
2201
2202     item = qitem->item;
2203     g_slice_free (GstQueue2Item, qitem);
2204   }
2205
2206   if (item == NULL)
2207     goto no_item;
2208
2209   if (GST_IS_BUFFER (item)) {
2210     GstBuffer *buffer;
2211     guint size;
2212
2213     buffer = GST_BUFFER_CAST (item);
2214     size = gst_buffer_get_size (buffer);
2215     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2216
2217     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2218         "retrieved buffer %p from queue", buffer);
2219
2220     if (QUEUE_IS_USING_QUEUE (queue)) {
2221       queue->cur_level.buffers--;
2222       queue->cur_level.bytes -= size;
2223     }
2224     queue->bytes_out += size;
2225
2226     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
2227     /* update the byterate stats */
2228     update_out_rates (queue);
2229     /* update the buffering */
2230     if (queue->use_buffering)
2231       update_buffering (queue);
2232
2233   } else if (GST_IS_EVENT (item)) {
2234     GstEvent *event = GST_EVENT_CAST (item);
2235
2236     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2237
2238     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2239         "retrieved event %p from queue", event);
2240
2241     switch (GST_EVENT_TYPE (event)) {
2242       case GST_EVENT_EOS:
2243         /* queue is empty now that we dequeued the EOS */
2244         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2245         break;
2246       case GST_EVENT_SEGMENT:
2247         apply_segment (queue, event, &queue->src_segment, FALSE);
2248         break;
2249       case GST_EVENT_GAP:
2250         apply_gap (queue, event, &queue->src_segment, FALSE);
2251         break;
2252       default:
2253         break;
2254     }
2255   } else if (GST_IS_BUFFER_LIST (item)) {
2256     GstBufferList *buffer_list;
2257     guint size = 0;
2258
2259     buffer_list = GST_BUFFER_LIST_CAST (item);
2260     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2261     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2262
2263     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2264         "retrieved buffer list %p from queue", buffer_list);
2265
2266     if (QUEUE_IS_USING_QUEUE (queue)) {
2267       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2268       queue->cur_level.bytes -= size;
2269     }
2270     queue->bytes_out += size;
2271
2272     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2273     /* update the byterate stats */
2274     update_out_rates (queue);
2275     /* update the buffering */
2276     if (queue->use_buffering)
2277       update_buffering (queue);
2278   } else if (GST_IS_QUERY (item)) {
2279     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2280         "retrieved query %p from queue", item);
2281     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2282   } else {
2283     g_warning
2284         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2285         item, GST_OBJECT_NAME (queue));
2286     item = NULL;
2287     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2288   }
2289   GST_QUEUE2_SIGNAL_DEL (queue);
2290
2291   return item;
2292
2293   /* ERRORS */
2294 no_item:
2295   {
2296     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2297     return NULL;
2298   }
2299 }
2300
2301 static gboolean
2302 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2303     GstEvent * event)
2304 {
2305   gboolean ret = TRUE;
2306   GstQueue2 *queue;
2307
2308   queue = GST_QUEUE2 (parent);
2309
2310   switch (GST_EVENT_TYPE (event)) {
2311     case GST_EVENT_FLUSH_START:
2312     {
2313       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2314       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2315         /* forward event */
2316         ret = gst_pad_push_event (queue->srcpad, event);
2317
2318         /* now unblock the chain function */
2319         GST_QUEUE2_MUTEX_LOCK (queue);
2320         queue->srcresult = GST_FLOW_FLUSHING;
2321         queue->sinkresult = GST_FLOW_FLUSHING;
2322         /* unblock the loop and chain functions */
2323         GST_QUEUE2_SIGNAL_ADD (queue);
2324         GST_QUEUE2_SIGNAL_DEL (queue);
2325         queue->last_query = FALSE;
2326         g_cond_signal (&queue->query_handled);
2327         GST_QUEUE2_MUTEX_UNLOCK (queue);
2328
2329         /* make sure it pauses, this should happen since we sent
2330          * flush_start downstream. */
2331         gst_pad_pause_task (queue->srcpad);
2332         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2333       } else {
2334         GST_QUEUE2_MUTEX_LOCK (queue);
2335         /* flush the sink pad */
2336         queue->sinkresult = GST_FLOW_FLUSHING;
2337         GST_QUEUE2_SIGNAL_DEL (queue);
2338         queue->last_query = FALSE;
2339         g_cond_signal (&queue->query_handled);
2340         GST_QUEUE2_MUTEX_UNLOCK (queue);
2341
2342         gst_event_unref (event);
2343       }
2344       break;
2345     }
2346     case GST_EVENT_FLUSH_STOP:
2347     {
2348       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2349
2350       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2351         /* forward event */
2352         ret = gst_pad_push_event (queue->srcpad, event);
2353
2354         GST_QUEUE2_MUTEX_LOCK (queue);
2355         gst_queue2_locked_flush (queue, FALSE, TRUE);
2356         queue->srcresult = GST_FLOW_OK;
2357         queue->sinkresult = GST_FLOW_OK;
2358         queue->is_eos = FALSE;
2359         queue->unexpected = FALSE;
2360         queue->seeking = FALSE;
2361         /* reset rate counters */
2362         reset_rate_timer (queue);
2363         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2364             queue->srcpad, NULL);
2365         GST_QUEUE2_MUTEX_UNLOCK (queue);
2366       } else {
2367         GST_QUEUE2_MUTEX_LOCK (queue);
2368         queue->segment_event_received = FALSE;
2369         queue->is_eos = FALSE;
2370         queue->unexpected = FALSE;
2371         queue->sinkresult = GST_FLOW_OK;
2372         queue->seeking = FALSE;
2373         GST_QUEUE2_MUTEX_UNLOCK (queue);
2374
2375         gst_event_unref (event);
2376       }
2377       break;
2378     }
2379     default:
2380       if (GST_EVENT_IS_SERIALIZED (event)) {
2381         /* serialized events go in the queue */
2382         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2383         if (queue->srcresult != GST_FLOW_OK) {
2384           /* Errors in sticky event pushing are no problem and ignored here
2385            * as they will cause more meaningful errors during data flow.
2386            * For EOS events, that are not followed by data flow, we still
2387            * return FALSE here though and report an error.
2388            */
2389           if (!GST_EVENT_IS_STICKY (event)) {
2390             goto out_flow_error;
2391           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2392             if (queue->srcresult == GST_FLOW_NOT_LINKED
2393                 || queue->srcresult < GST_FLOW_EOS) {
2394               GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2395                   (_("Internal data flow error.")),
2396                   ("streaming task paused, reason %s (%d)",
2397                       gst_flow_get_name (queue->srcresult), queue->srcresult));
2398             }
2399             goto out_flow_error;
2400           }
2401         }
2402         /* refuse more events on EOS */
2403         if (queue->is_eos)
2404           goto out_eos;
2405         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2406         GST_QUEUE2_MUTEX_UNLOCK (queue);
2407         gst_queue2_post_buffering (queue);
2408       } else {
2409         /* non-serialized events are passed upstream. */
2410         ret = gst_pad_push_event (queue->srcpad, event);
2411       }
2412       break;
2413   }
2414   return ret;
2415
2416   /* ERRORS */
2417 out_flushing:
2418   {
2419     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2420     GST_QUEUE2_MUTEX_UNLOCK (queue);
2421     gst_event_unref (event);
2422     return FALSE;
2423   }
2424 out_eos:
2425   {
2426     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2427     GST_QUEUE2_MUTEX_UNLOCK (queue);
2428     gst_event_unref (event);
2429     return FALSE;
2430   }
2431 out_flow_error:
2432   {
2433     GST_LOG_OBJECT (queue,
2434         "refusing event, we have a downstream flow error: %s",
2435         gst_flow_get_name (queue->srcresult));
2436     GST_QUEUE2_MUTEX_UNLOCK (queue);
2437     gst_event_unref (event);
2438     return FALSE;
2439   }
2440 }
2441
2442 static gboolean
2443 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2444     GstQuery * query)
2445 {
2446   GstQueue2 *queue;
2447   gboolean res;
2448
2449   queue = GST_QUEUE2 (parent);
2450
2451   switch (GST_QUERY_TYPE (query)) {
2452     default:
2453       if (GST_QUERY_IS_SERIALIZED (query)) {
2454         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2455         /* serialized events go in the queue. We need to be certain that we
2456          * don't cause deadlocks waiting for the query return value. We check if
2457          * the queue is empty (nothing is blocking downstream and the query can
2458          * be pushed for sure) or we are not buffering. If we are buffering,
2459          * the pipeline waits to unblock downstream until our queue fills up
2460          * completely, which can not happen if we block on the query..
2461          * Therefore we only potentially block when we are not buffering. */
2462         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2463         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2464                 || !queue->use_buffering)) {
2465           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2466             gst_queue2_locked_enqueue (queue, query,
2467                 GST_QUEUE2_ITEM_TYPE_QUERY);
2468
2469             STATUS (queue, queue->sinkpad, "wait for QUERY");
2470             g_cond_wait (&queue->query_handled, &queue->qlock);
2471             if (queue->sinkresult != GST_FLOW_OK)
2472               goto out_flushing;
2473             res = queue->last_query;
2474           } else {
2475             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2476             res = FALSE;
2477           }
2478         } else {
2479           GST_DEBUG_OBJECT (queue,
2480               "refusing query, we are not using the queue");
2481           res = FALSE;
2482         }
2483         GST_QUEUE2_MUTEX_UNLOCK (queue);
2484         gst_queue2_post_buffering (queue);
2485       } else {
2486         res = gst_pad_query_default (pad, parent, query);
2487       }
2488       break;
2489   }
2490   return res;
2491
2492   /* ERRORS */
2493 out_flushing:
2494   {
2495     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2496     GST_QUEUE2_MUTEX_UNLOCK (queue);
2497     return FALSE;
2498   }
2499 }
2500
2501 static gboolean
2502 gst_queue2_is_empty (GstQueue2 * queue)
2503 {
2504   /* never empty on EOS */
2505   if (queue->is_eos)
2506     return FALSE;
2507
2508   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2509     return queue->current->writing_pos <= queue->current->max_reading_pos;
2510   } else {
2511     if (queue->queue.length == 0)
2512       return TRUE;
2513   }
2514
2515   return FALSE;
2516 }
2517
2518 static gboolean
2519 gst_queue2_is_filled (GstQueue2 * queue)
2520 {
2521   gboolean res;
2522
2523   /* always filled on EOS */
2524   if (queue->is_eos)
2525     return TRUE;
2526
2527 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2528     (queue->cur_level.format) >= ((alt_max) ? \
2529       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2530
2531   /* if using a ring buffer we're filled if all ring buffer space is used
2532    * _by the current range_ */
2533   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2534     guint64 rb_size = queue->ring_buffer_max_size;
2535     GST_DEBUG_OBJECT (queue,
2536         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2537         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2538     return CHECK_FILLED (bytes, rb_size);
2539   }
2540
2541   /* if using file, we're never filled if we don't have EOS */
2542   if (QUEUE_IS_USING_TEMP_FILE (queue))
2543     return FALSE;
2544
2545   /* we are never filled when we have no buffers at all */
2546   if (queue->cur_level.buffers == 0)
2547     return FALSE;
2548
2549   /* we are filled if one of the current levels exceeds the max */
2550   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2551       || CHECK_FILLED (time, 0);
2552
2553   /* if we need to, use the rate estimate to check against the max time we are
2554    * allowed to queue */
2555   if (queue->use_rate_estimate)
2556     res |= CHECK_FILLED (rate_time, 0);
2557
2558 #undef CHECK_FILLED
2559   return res;
2560 }
2561
2562 static GstFlowReturn
2563 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2564     GstMiniObject * item, GstQueue2ItemType item_type)
2565 {
2566   /* we have to lock the queue since we span threads */
2567   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2568   /* when we received EOS, we refuse more data */
2569   if (queue->is_eos)
2570     goto out_eos;
2571   /* when we received unexpected from downstream, refuse more buffers */
2572   if (queue->unexpected)
2573     goto out_unexpected;
2574
2575   /* while we didn't receive the newsegment, we're seeking and we skip data */
2576   if (queue->seeking)
2577     goto out_seeking;
2578
2579   if (!gst_queue2_wait_free_space (queue))
2580     goto out_flushing;
2581
2582   /* put buffer in queue now */
2583   gst_queue2_locked_enqueue (queue, item, item_type);
2584   GST_QUEUE2_MUTEX_UNLOCK (queue);
2585   gst_queue2_post_buffering (queue);
2586
2587   return GST_FLOW_OK;
2588
2589   /* special conditions */
2590 out_flushing:
2591   {
2592     GstFlowReturn ret = queue->sinkresult;
2593
2594     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2595         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2596     GST_QUEUE2_MUTEX_UNLOCK (queue);
2597     gst_mini_object_unref (item);
2598
2599     return ret;
2600   }
2601 out_eos:
2602   {
2603     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2604     GST_QUEUE2_MUTEX_UNLOCK (queue);
2605     gst_mini_object_unref (item);
2606
2607     return GST_FLOW_EOS;
2608   }
2609 out_seeking:
2610   {
2611     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2612     GST_QUEUE2_MUTEX_UNLOCK (queue);
2613     gst_mini_object_unref (item);
2614
2615     return GST_FLOW_OK;
2616   }
2617 out_unexpected:
2618   {
2619     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2620     GST_QUEUE2_MUTEX_UNLOCK (queue);
2621     gst_mini_object_unref (item);
2622
2623     return GST_FLOW_EOS;
2624   }
2625 }
2626
2627 static GstFlowReturn
2628 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2629 {
2630   GstQueue2 *queue;
2631
2632   queue = GST_QUEUE2 (parent);
2633
2634   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2635       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2636       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2637       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2638       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2639
2640   return gst_queue2_chain_buffer_or_buffer_list (queue,
2641       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2642 }
2643
2644 static GstFlowReturn
2645 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2646     GstBufferList * buffer_list)
2647 {
2648   GstQueue2 *queue;
2649
2650   queue = GST_QUEUE2 (parent);
2651
2652   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2653       "received buffer list %p", buffer_list);
2654
2655   return gst_queue2_chain_buffer_or_buffer_list (queue,
2656       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2657 }
2658
2659 static GstMiniObject *
2660 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2661 {
2662   GstMiniObject *data;
2663
2664   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2665
2666   /* stop pushing buffers, we dequeue all items until we see an item that we
2667    * can push again, which is EOS or SEGMENT. If there is nothing in the
2668    * queue we can push, we set a flag to make the sinkpad refuse more
2669    * buffers with an EOS return value until we receive something
2670    * pushable again or we get flushed. */
2671   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2672     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2673       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2674           "dropping EOS buffer %p", data);
2675       gst_buffer_unref (GST_BUFFER_CAST (data));
2676     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2677       GstEvent *event = GST_EVENT_CAST (data);
2678       GstEventType type = GST_EVENT_TYPE (event);
2679
2680       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2681         /* we found a pushable item in the queue, push it out */
2682         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2683             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2684         return data;
2685       }
2686       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2687           "dropping EOS event %p", event);
2688       gst_event_unref (event);
2689     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2690       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2691           "dropping EOS buffer list %p", data);
2692       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2693     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2694       queue->last_query = FALSE;
2695       g_cond_signal (&queue->query_handled);
2696       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2697     }
2698   }
2699   /* no more items in the queue. Set the unexpected flag so that upstream
2700    * make us refuse any more buffers on the sinkpad. Since we will still
2701    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2702    * task function does not shut down. */
2703   queue->unexpected = TRUE;
2704   return NULL;
2705 }
2706
2707 /* dequeue an item from the queue an push it downstream. This functions returns
2708  * the result of the push. */
2709 static GstFlowReturn
2710 gst_queue2_push_one (GstQueue2 * queue)
2711 {
2712   GstFlowReturn result = queue->srcresult;
2713   GstMiniObject *data;
2714   GstQueue2ItemType item_type;
2715
2716   data = gst_queue2_locked_dequeue (queue, &item_type);
2717   if (data == NULL)
2718     goto no_item;
2719
2720 next:
2721   STATUS (queue, queue->srcpad, "We have something dequeud");
2722   g_atomic_int_set (&queue->downstream_may_block,
2723       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2724       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2725   GST_QUEUE2_MUTEX_UNLOCK (queue);
2726   gst_queue2_post_buffering (queue);
2727
2728   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2729     GstBuffer *buffer;
2730
2731     buffer = GST_BUFFER_CAST (data);
2732
2733     result = gst_pad_push (queue->srcpad, buffer);
2734     g_atomic_int_set (&queue->downstream_may_block, 0);
2735
2736     /* need to check for srcresult here as well */
2737     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2738     if (result == GST_FLOW_EOS) {
2739       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2740       if (data != NULL)
2741         goto next;
2742       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2743        * to the caller so that the task function does not shut down */
2744       result = GST_FLOW_OK;
2745     }
2746   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2747     GstEvent *event = GST_EVENT_CAST (data);
2748     GstEventType type = GST_EVENT_TYPE (event);
2749
2750     gst_pad_push_event (queue->srcpad, event);
2751
2752     /* if we're EOS, return EOS so that the task pauses. */
2753     if (type == GST_EVENT_EOS) {
2754       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2755           "pushed EOS event %p, return EOS", event);
2756       result = GST_FLOW_EOS;
2757     }
2758
2759     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2760   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2761     GstBufferList *buffer_list;
2762
2763     buffer_list = GST_BUFFER_LIST_CAST (data);
2764
2765     result = gst_pad_push_list (queue->srcpad, buffer_list);
2766     g_atomic_int_set (&queue->downstream_may_block, 0);
2767
2768     /* need to check for srcresult here as well */
2769     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2770     if (result == GST_FLOW_EOS) {
2771       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2772       if (data != NULL)
2773         goto next;
2774       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2775        * to the caller so that the task function does not shut down */
2776       result = GST_FLOW_OK;
2777     }
2778   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2779     GstQuery *query = GST_QUERY_CAST (data);
2780
2781     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2782     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2783     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2784     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2785         "did query %p, return %d", query, queue->last_query);
2786     g_cond_signal (&queue->query_handled);
2787     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2788     result = GST_FLOW_OK;
2789   }
2790   return result;
2791
2792   /* ERRORS */
2793 no_item:
2794   {
2795     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2796         "exit because we have no item in the queue");
2797     return GST_FLOW_ERROR;
2798   }
2799 out_flushing:
2800   {
2801     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2802     return GST_FLOW_FLUSHING;
2803   }
2804 }
2805
2806 /* called repeatedly with @pad as the source pad. This function should push out
2807  * data to the peer element. */
2808 static void
2809 gst_queue2_loop (GstPad * pad)
2810 {
2811   GstQueue2 *queue;
2812   GstFlowReturn ret;
2813
2814   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2815
2816   /* have to lock for thread-safety */
2817   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2818
2819   if (gst_queue2_is_empty (queue)) {
2820     gboolean started;
2821
2822     /* pause the timer while we wait. The fact that we are waiting does not mean
2823      * the byterate on the output pad is lower */
2824     if ((started = queue->out_timer_started))
2825       g_timer_stop (queue->out_timer);
2826
2827     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2828         "queue is empty, waiting for new data");
2829     do {
2830       /* Wait for data to be available, we could be unlocked because of a flush. */
2831       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2832     }
2833     while (gst_queue2_is_empty (queue));
2834
2835     /* and continue if we were running before */
2836     if (started)
2837       g_timer_continue (queue->out_timer);
2838   }
2839   ret = gst_queue2_push_one (queue);
2840   queue->srcresult = ret;
2841   queue->sinkresult = ret;
2842   if (ret != GST_FLOW_OK)
2843     goto out_flushing;
2844
2845   GST_QUEUE2_MUTEX_UNLOCK (queue);
2846   gst_queue2_post_buffering (queue);
2847
2848   return;
2849
2850   /* ERRORS */
2851 out_flushing:
2852   {
2853     gboolean eos = queue->is_eos;
2854     GstFlowReturn ret = queue->srcresult;
2855
2856     gst_pad_pause_task (queue->srcpad);
2857     if (ret == GST_FLOW_FLUSHING) {
2858       gst_queue2_locked_flush (queue, FALSE, FALSE);
2859     } else {
2860       GST_QUEUE2_SIGNAL_DEL (queue);
2861       queue->last_query = FALSE;
2862       g_cond_signal (&queue->query_handled);
2863     }
2864     GST_QUEUE2_MUTEX_UNLOCK (queue);
2865     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2866         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2867     /* let app know about us giving up if upstream is not expected to do so */
2868     /* EOS is already taken care of elsewhere */
2869     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
2870       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2871           (_("Internal data flow error.")),
2872           ("streaming task paused, reason %s (%d)",
2873               gst_flow_get_name (ret), ret));
2874       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2875     }
2876     return;
2877   }
2878 }
2879
2880 static gboolean
2881 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2882 {
2883   gboolean res = TRUE;
2884   GstQueue2 *queue = GST_QUEUE2 (parent);
2885
2886 #ifndef GST_DISABLE_GST_DEBUG
2887   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2888       event, GST_EVENT_TYPE_NAME (event));
2889 #endif
2890
2891   switch (GST_EVENT_TYPE (event)) {
2892     case GST_EVENT_FLUSH_START:
2893       if (QUEUE_IS_USING_QUEUE (queue)) {
2894         /* just forward upstream */
2895         res = gst_pad_push_event (queue->sinkpad, event);
2896       } else {
2897         /* now unblock the getrange function */
2898         GST_QUEUE2_MUTEX_LOCK (queue);
2899         GST_DEBUG_OBJECT (queue, "flushing");
2900         queue->srcresult = GST_FLOW_FLUSHING;
2901         GST_QUEUE2_SIGNAL_ADD (queue);
2902         GST_QUEUE2_MUTEX_UNLOCK (queue);
2903
2904         /* when using a temp file, we eat the event */
2905         res = TRUE;
2906         gst_event_unref (event);
2907       }
2908       break;
2909     case GST_EVENT_FLUSH_STOP:
2910       if (QUEUE_IS_USING_QUEUE (queue)) {
2911         /* just forward upstream */
2912         res = gst_pad_push_event (queue->sinkpad, event);
2913       } else {
2914         /* now unblock the getrange function */
2915         GST_QUEUE2_MUTEX_LOCK (queue);
2916         queue->srcresult = GST_FLOW_OK;
2917         GST_QUEUE2_MUTEX_UNLOCK (queue);
2918
2919         /* when using a temp file, we eat the event */
2920         res = TRUE;
2921         gst_event_unref (event);
2922       }
2923       break;
2924     case GST_EVENT_RECONFIGURE:
2925       GST_QUEUE2_MUTEX_LOCK (queue);
2926       /* assume downstream is linked now and try to push again */
2927       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
2928         queue->srcresult = GST_FLOW_OK;
2929         queue->sinkresult = GST_FLOW_OK;
2930         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
2931           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
2932               NULL);
2933         }
2934       }
2935       GST_QUEUE2_MUTEX_UNLOCK (queue);
2936
2937       res = gst_pad_push_event (queue->sinkpad, event);
2938       break;
2939     default:
2940       res = gst_pad_push_event (queue->sinkpad, event);
2941       break;
2942   }
2943
2944   return res;
2945 }
2946
2947 static gboolean
2948 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2949 {
2950   GstQueue2 *queue;
2951
2952   queue = GST_QUEUE2 (parent);
2953
2954   switch (GST_QUERY_TYPE (query)) {
2955     case GST_QUERY_POSITION:
2956     {
2957       gint64 peer_pos;
2958       GstFormat format;
2959
2960       if (!gst_pad_peer_query (queue->sinkpad, query))
2961         goto peer_failed;
2962
2963       /* get peer position */
2964       gst_query_parse_position (query, &format, &peer_pos);
2965
2966       /* FIXME: this code assumes that there's no discont in the queue */
2967       switch (format) {
2968         case GST_FORMAT_BYTES:
2969           peer_pos -= queue->cur_level.bytes;
2970           break;
2971         case GST_FORMAT_TIME:
2972           peer_pos -= queue->cur_level.time;
2973           break;
2974         default:
2975           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
2976               "know how to adjust value", gst_format_get_name (format));
2977           return FALSE;
2978       }
2979       /* set updated position */
2980       gst_query_set_position (query, format, peer_pos);
2981       break;
2982     }
2983     case GST_QUERY_DURATION:
2984     {
2985       GST_DEBUG_OBJECT (queue, "doing peer query");
2986
2987       if (!gst_pad_peer_query (queue->sinkpad, query))
2988         goto peer_failed;
2989
2990       GST_DEBUG_OBJECT (queue, "peer query success");
2991       break;
2992     }
2993     case GST_QUERY_BUFFERING:
2994     {
2995       gint percent;
2996       gboolean is_buffering;
2997       GstBufferingMode mode;
2998       gint avg_in, avg_out;
2999       gint64 buffering_left;
3000
3001       GST_DEBUG_OBJECT (queue, "query buffering");
3002
3003       get_buffering_percent (queue, &is_buffering, &percent);
3004       gst_query_set_buffering_percent (query, is_buffering, percent);
3005
3006       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3007           &buffering_left);
3008       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3009           buffering_left);
3010
3011       if (!QUEUE_IS_USING_QUEUE (queue)) {
3012         /* add ranges for download and ringbuffer buffering */
3013         GstFormat format;
3014         gint64 start, stop, range_start, range_stop;
3015         guint64 writing_pos;
3016         gint64 estimated_total;
3017         gint64 duration;
3018         gboolean peer_res, is_eos;
3019         GstQueue2Range *queued_ranges;
3020
3021         /* we need a current download region */
3022         if (queue->current == NULL)
3023           return FALSE;
3024
3025         writing_pos = queue->current->writing_pos;
3026         is_eos = queue->is_eos;
3027
3028         if (is_eos) {
3029           /* we're EOS, we know the duration in bytes now */
3030           peer_res = TRUE;
3031           duration = writing_pos;
3032         } else {
3033           /* get duration of upstream in bytes */
3034           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3035               GST_FORMAT_BYTES, &duration);
3036         }
3037
3038         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3039             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3040
3041         /* calculate remaining and total download time */
3042         if (peer_res && avg_in > 0.0)
3043           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3044         else
3045           estimated_total = -1;
3046
3047         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3048             estimated_total);
3049
3050         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3051
3052         switch (format) {
3053           case GST_FORMAT_PERCENT:
3054             /* we need duration */
3055             if (!peer_res)
3056               goto peer_failed;
3057
3058             start = 0;
3059             /* get our available data relative to the duration */
3060             if (duration != -1)
3061               stop =
3062                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3063                   duration);
3064             else
3065               stop = -1;
3066             break;
3067           case GST_FORMAT_BYTES:
3068             start = 0;
3069             stop = writing_pos;
3070             break;
3071           default:
3072             start = -1;
3073             stop = -1;
3074             break;
3075         }
3076
3077         /* fill out the buffered ranges */
3078         for (queued_ranges = queue->ranges; queued_ranges;
3079             queued_ranges = queued_ranges->next) {
3080           switch (format) {
3081             case GST_FORMAT_PERCENT:
3082               if (duration == -1) {
3083                 range_start = 0;
3084                 range_stop = 0;
3085                 break;
3086               }
3087               range_start =
3088                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3089                   queued_ranges->offset, duration);
3090               range_stop =
3091                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3092                   queued_ranges->writing_pos, duration);
3093               break;
3094             case GST_FORMAT_BYTES:
3095               range_start = queued_ranges->offset;
3096               range_stop = queued_ranges->writing_pos;
3097               break;
3098             default:
3099               range_start = -1;
3100               range_stop = -1;
3101               break;
3102           }
3103           if (range_start == range_stop)
3104             continue;
3105           GST_DEBUG_OBJECT (queue,
3106               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3107               G_GINT64_FORMAT, range_start, range_stop);
3108           gst_query_add_buffering_range (query, range_start, range_stop);
3109         }
3110
3111         gst_query_set_buffering_range (query, format, start, stop,
3112             estimated_total);
3113       }
3114       break;
3115     }
3116     case GST_QUERY_SCHEDULING:
3117     {
3118       gboolean pull_mode;
3119       GstSchedulingFlags flags = 0;
3120
3121       if (!gst_pad_peer_query (queue->sinkpad, query))
3122         goto peer_failed;
3123
3124       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3125
3126       /* we can operate in pull mode when we are using a tempfile */
3127       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3128
3129       if (pull_mode)
3130         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3131       gst_query_set_scheduling (query, flags, 0, -1, 0);
3132       if (pull_mode)
3133         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3134       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3135       break;
3136     }
3137     default:
3138       /* peer handled other queries */
3139       if (!gst_pad_query_default (pad, parent, query))
3140         goto peer_failed;
3141       break;
3142   }
3143
3144   return TRUE;
3145
3146   /* ERRORS */
3147 peer_failed:
3148   {
3149     GST_DEBUG_OBJECT (queue, "failed peer query");
3150     return FALSE;
3151   }
3152 }
3153
3154 static gboolean
3155 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3156 {
3157   GstQueue2 *queue = GST_QUEUE2 (element);
3158
3159   /* simply forward to the srcpad query function */
3160   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3161       query);
3162 }
3163
3164 static void
3165 gst_queue2_update_upstream_size (GstQueue2 * queue)
3166 {
3167   gint64 upstream_size = -1;
3168
3169   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3170           &upstream_size)) {
3171     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3172
3173     /* upstream_size can be negative but queue->upstream_size is unsigned.
3174      * Prevent setting negative values to it (the query can return -1) */
3175     if (upstream_size >= 0)
3176       queue->upstream_size = upstream_size;
3177     else
3178       queue->upstream_size = 0;
3179   }
3180 }
3181
3182 static GstFlowReturn
3183 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3184     guint length, GstBuffer ** buffer)
3185 {
3186   GstQueue2 *queue;
3187   GstFlowReturn ret;
3188
3189   queue = GST_QUEUE2_CAST (parent);
3190
3191   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3192   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3193   offset = (offset == -1) ? queue->current->reading_pos : offset;
3194
3195   GST_DEBUG_OBJECT (queue,
3196       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3197
3198   /* catch any reads beyond the size of the file here to make sure queue2
3199    * doesn't send seek events beyond the size of the file upstream, since
3200    * that would confuse elements such as souphttpsrc and/or http servers.
3201    * Demuxers often just loop until EOS at the end of the file to figure out
3202    * when they've read all the end-headers or index chunks. */
3203   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3204     gst_queue2_update_upstream_size (queue);
3205     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3206       goto out_unexpected;
3207   }
3208
3209   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3210     gst_queue2_update_upstream_size (queue);
3211     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3212       length = queue->upstream_size - offset;
3213       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3214     }
3215   }
3216
3217   /* FIXME - function will block when the range is not yet available */
3218   ret = gst_queue2_create_read (queue, offset, length, buffer);
3219   GST_QUEUE2_MUTEX_UNLOCK (queue);
3220   gst_queue2_post_buffering (queue);
3221
3222   return ret;
3223
3224   /* ERRORS */
3225 out_flushing:
3226   {
3227     ret = queue->srcresult;
3228
3229     GST_DEBUG_OBJECT (queue, "we are flushing");
3230     GST_QUEUE2_MUTEX_UNLOCK (queue);
3231     return ret;
3232   }
3233 out_unexpected:
3234   {
3235     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3236     GST_QUEUE2_MUTEX_UNLOCK (queue);
3237     return GST_FLOW_EOS;
3238   }
3239 }
3240
3241 /* sink currently only operates in push mode */
3242 static gboolean
3243 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3244     GstPadMode mode, gboolean active)
3245 {
3246   gboolean result;
3247   GstQueue2 *queue;
3248
3249   queue = GST_QUEUE2 (parent);
3250
3251   switch (mode) {
3252     case GST_PAD_MODE_PUSH:
3253       if (active) {
3254         GST_QUEUE2_MUTEX_LOCK (queue);
3255         GST_DEBUG_OBJECT (queue, "activating push mode");
3256         queue->srcresult = GST_FLOW_OK;
3257         queue->sinkresult = GST_FLOW_OK;
3258         queue->is_eos = FALSE;
3259         queue->unexpected = FALSE;
3260         reset_rate_timer (queue);
3261         GST_QUEUE2_MUTEX_UNLOCK (queue);
3262       } else {
3263         /* unblock chain function */
3264         GST_QUEUE2_MUTEX_LOCK (queue);
3265         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3266         queue->srcresult = GST_FLOW_FLUSHING;
3267         queue->sinkresult = GST_FLOW_FLUSHING;
3268         GST_QUEUE2_SIGNAL_DEL (queue);
3269         /* Unblock query handler */
3270         queue->last_query = FALSE;
3271         g_cond_signal (&queue->query_handled);
3272         GST_QUEUE2_MUTEX_UNLOCK (queue);
3273
3274         /* wait until it is unblocked and clean up */
3275         GST_PAD_STREAM_LOCK (pad);
3276         GST_QUEUE2_MUTEX_LOCK (queue);
3277         gst_queue2_locked_flush (queue, TRUE, FALSE);
3278         GST_QUEUE2_MUTEX_UNLOCK (queue);
3279         GST_PAD_STREAM_UNLOCK (pad);
3280       }
3281       result = TRUE;
3282       break;
3283     default:
3284       result = FALSE;
3285       break;
3286   }
3287   return result;
3288 }
3289
3290 /* src operating in push mode, we start a task on the source pad that pushes out
3291  * buffers from the queue */
3292 static gboolean
3293 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3294 {
3295   gboolean result = FALSE;
3296   GstQueue2 *queue;
3297
3298   queue = GST_QUEUE2 (parent);
3299
3300   if (active) {
3301     GST_QUEUE2_MUTEX_LOCK (queue);
3302     GST_DEBUG_OBJECT (queue, "activating push mode");
3303     queue->srcresult = GST_FLOW_OK;
3304     queue->sinkresult = GST_FLOW_OK;
3305     queue->is_eos = FALSE;
3306     queue->unexpected = FALSE;
3307     result =
3308         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3309     GST_QUEUE2_MUTEX_UNLOCK (queue);
3310   } else {
3311     /* unblock loop function */
3312     GST_QUEUE2_MUTEX_LOCK (queue);
3313     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3314     queue->srcresult = GST_FLOW_FLUSHING;
3315     queue->sinkresult = GST_FLOW_FLUSHING;
3316     /* the item add signal will unblock */
3317     GST_QUEUE2_SIGNAL_ADD (queue);
3318     GST_QUEUE2_MUTEX_UNLOCK (queue);
3319
3320     /* step 2, make sure streaming finishes */
3321     result = gst_pad_stop_task (pad);
3322   }
3323
3324   return result;
3325 }
3326
3327 /* pull mode, downstream will call our getrange function */
3328 static gboolean
3329 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3330 {
3331   gboolean result;
3332   GstQueue2 *queue;
3333
3334   queue = GST_QUEUE2 (parent);
3335
3336   if (active) {
3337     GST_QUEUE2_MUTEX_LOCK (queue);
3338     if (!QUEUE_IS_USING_QUEUE (queue)) {
3339       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3340         /* open the temp file now */
3341         result = gst_queue2_open_temp_location_file (queue);
3342       } else if (!queue->ring_buffer) {
3343         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3344         result = ! !queue->ring_buffer;
3345       } else {
3346         result = TRUE;
3347       }
3348
3349       GST_DEBUG_OBJECT (queue, "activating pull mode");
3350       init_ranges (queue);
3351       queue->srcresult = GST_FLOW_OK;
3352       queue->sinkresult = GST_FLOW_OK;
3353       queue->is_eos = FALSE;
3354       queue->unexpected = FALSE;
3355       queue->upstream_size = 0;
3356     } else {
3357       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3358       /* this is not allowed, we cannot operate in pull mode without a temp
3359        * file. */
3360       queue->srcresult = GST_FLOW_FLUSHING;
3361       queue->sinkresult = GST_FLOW_FLUSHING;
3362       result = FALSE;
3363     }
3364     GST_QUEUE2_MUTEX_UNLOCK (queue);
3365   } else {
3366     GST_QUEUE2_MUTEX_LOCK (queue);
3367     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3368     queue->srcresult = GST_FLOW_FLUSHING;
3369     queue->sinkresult = GST_FLOW_FLUSHING;
3370     /* this will unlock getrange */
3371     GST_QUEUE2_SIGNAL_ADD (queue);
3372     result = TRUE;
3373     GST_QUEUE2_MUTEX_UNLOCK (queue);
3374   }
3375
3376   return result;
3377 }
3378
3379 static gboolean
3380 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3381     gboolean active)
3382 {
3383   gboolean res;
3384
3385   switch (mode) {
3386     case GST_PAD_MODE_PULL:
3387       res = gst_queue2_src_activate_pull (pad, parent, active);
3388       break;
3389     case GST_PAD_MODE_PUSH:
3390       res = gst_queue2_src_activate_push (pad, parent, active);
3391       break;
3392     default:
3393       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3394       res = FALSE;
3395       break;
3396   }
3397   return res;
3398 }
3399
3400 static GstStateChangeReturn
3401 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3402 {
3403   GstQueue2 *queue;
3404   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3405
3406   queue = GST_QUEUE2 (element);
3407
3408   switch (transition) {
3409     case GST_STATE_CHANGE_NULL_TO_READY:
3410       break;
3411     case GST_STATE_CHANGE_READY_TO_PAUSED:
3412       GST_QUEUE2_MUTEX_LOCK (queue);
3413       if (!QUEUE_IS_USING_QUEUE (queue)) {
3414         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3415           if (!gst_queue2_open_temp_location_file (queue))
3416             ret = GST_STATE_CHANGE_FAILURE;
3417         } else {
3418           if (queue->ring_buffer) {
3419             g_free (queue->ring_buffer);
3420             queue->ring_buffer = NULL;
3421           }
3422           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3423             ret = GST_STATE_CHANGE_FAILURE;
3424         }
3425         init_ranges (queue);
3426       }
3427       queue->segment_event_received = FALSE;
3428       queue->starting_segment = NULL;
3429       gst_event_replace (&queue->stream_start_event, NULL);
3430       GST_QUEUE2_MUTEX_UNLOCK (queue);
3431       break;
3432     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3433       break;
3434     default:
3435       break;
3436   }
3437
3438   if (ret == GST_STATE_CHANGE_FAILURE)
3439     return ret;
3440
3441   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3442
3443   if (ret == GST_STATE_CHANGE_FAILURE)
3444     return ret;
3445
3446   switch (transition) {
3447     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3448       break;
3449     case GST_STATE_CHANGE_PAUSED_TO_READY:
3450       GST_QUEUE2_MUTEX_LOCK (queue);
3451       if (!QUEUE_IS_USING_QUEUE (queue)) {
3452         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3453           gst_queue2_close_temp_location_file (queue);
3454         } else if (queue->ring_buffer) {
3455           g_free (queue->ring_buffer);
3456           queue->ring_buffer = NULL;
3457         }
3458         clean_ranges (queue);
3459       }
3460       if (queue->starting_segment != NULL) {
3461         gst_event_unref (queue->starting_segment);
3462         queue->starting_segment = NULL;
3463       }
3464       gst_event_replace (&queue->stream_start_event, NULL);
3465       GST_QUEUE2_MUTEX_UNLOCK (queue);
3466       break;
3467     case GST_STATE_CHANGE_READY_TO_NULL:
3468       break;
3469     default:
3470       break;
3471   }
3472
3473   return ret;
3474 }
3475
3476 /* changing the capacity of the queue must wake up
3477  * the _chain function, it might have more room now
3478  * to store the buffer/event in the queue */
3479 #define QUEUE_CAPACITY_CHANGE(q) \
3480   GST_QUEUE2_SIGNAL_DEL (queue); \
3481   if (queue->use_buffering)      \
3482     update_buffering (queue);
3483
3484 /* Changing the minimum required fill level must
3485  * wake up the _loop function as it might now
3486  * be able to preceed.
3487  */
3488 #define QUEUE_THRESHOLD_CHANGE(q)\
3489   GST_QUEUE2_SIGNAL_ADD (queue);
3490
3491 static void
3492 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3493 {
3494   GstState state;
3495
3496   /* the element must be stopped in order to do this */
3497   GST_OBJECT_LOCK (queue);
3498   state = GST_STATE (queue);
3499   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3500     goto wrong_state;
3501   GST_OBJECT_UNLOCK (queue);
3502
3503   /* set new location */
3504   g_free (queue->temp_template);
3505   queue->temp_template = g_strdup (template);
3506
3507   return;
3508
3509 /* ERROR */
3510 wrong_state:
3511   {
3512     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3513     GST_OBJECT_UNLOCK (queue);
3514   }
3515 }
3516
3517 static void
3518 gst_queue2_set_property (GObject * object,
3519     guint prop_id, const GValue * value, GParamSpec * pspec)
3520 {
3521   GstQueue2 *queue = GST_QUEUE2 (object);
3522
3523   /* someone could change levels here, and since this
3524    * affects the get/put funcs, we need to lock for safety. */
3525   GST_QUEUE2_MUTEX_LOCK (queue);
3526
3527   switch (prop_id) {
3528     case PROP_MAX_SIZE_BYTES:
3529       queue->max_level.bytes = g_value_get_uint (value);
3530       QUEUE_CAPACITY_CHANGE (queue);
3531       break;
3532     case PROP_MAX_SIZE_BUFFERS:
3533       queue->max_level.buffers = g_value_get_uint (value);
3534       QUEUE_CAPACITY_CHANGE (queue);
3535       break;
3536     case PROP_MAX_SIZE_TIME:
3537       queue->max_level.time = g_value_get_uint64 (value);
3538       /* set rate_time to the same value. We use an extra field in the level
3539        * structure so that we can easily access and compare it */
3540       queue->max_level.rate_time = queue->max_level.time;
3541       QUEUE_CAPACITY_CHANGE (queue);
3542       break;
3543     case PROP_USE_BUFFERING:
3544       queue->use_buffering = g_value_get_boolean (value);
3545       if (!queue->use_buffering && queue->is_buffering) {
3546         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3547             "posting 100%% message");
3548         SET_PERCENT (queue, 100);
3549         queue->is_buffering = FALSE;
3550       }
3551
3552       if (queue->use_buffering) {
3553         queue->is_buffering = TRUE;
3554         update_buffering (queue);
3555       }
3556       break;
3557     case PROP_USE_RATE_ESTIMATE:
3558       queue->use_rate_estimate = g_value_get_boolean (value);
3559       break;
3560     case PROP_LOW_PERCENT:
3561       queue->low_percent = g_value_get_int (value);
3562       break;
3563     case PROP_HIGH_PERCENT:
3564       queue->high_percent = g_value_get_int (value);
3565       break;
3566     case PROP_TEMP_TEMPLATE:
3567       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3568       break;
3569     case PROP_TEMP_REMOVE:
3570       queue->temp_remove = g_value_get_boolean (value);
3571       break;
3572     case PROP_RING_BUFFER_MAX_SIZE:
3573       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3574       break;
3575     default:
3576       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3577       break;
3578   }
3579
3580   GST_QUEUE2_MUTEX_UNLOCK (queue);
3581   gst_queue2_post_buffering (queue);
3582 }
3583
3584 static void
3585 gst_queue2_get_property (GObject * object,
3586     guint prop_id, GValue * value, GParamSpec * pspec)
3587 {
3588   GstQueue2 *queue = GST_QUEUE2 (object);
3589
3590   GST_QUEUE2_MUTEX_LOCK (queue);
3591
3592   switch (prop_id) {
3593     case PROP_CUR_LEVEL_BYTES:
3594       g_value_set_uint (value, queue->cur_level.bytes);
3595       break;
3596     case PROP_CUR_LEVEL_BUFFERS:
3597       g_value_set_uint (value, queue->cur_level.buffers);
3598       break;
3599     case PROP_CUR_LEVEL_TIME:
3600       g_value_set_uint64 (value, queue->cur_level.time);
3601       break;
3602     case PROP_MAX_SIZE_BYTES:
3603       g_value_set_uint (value, queue->max_level.bytes);
3604       break;
3605     case PROP_MAX_SIZE_BUFFERS:
3606       g_value_set_uint (value, queue->max_level.buffers);
3607       break;
3608     case PROP_MAX_SIZE_TIME:
3609       g_value_set_uint64 (value, queue->max_level.time);
3610       break;
3611     case PROP_USE_BUFFERING:
3612       g_value_set_boolean (value, queue->use_buffering);
3613       break;
3614     case PROP_USE_RATE_ESTIMATE:
3615       g_value_set_boolean (value, queue->use_rate_estimate);
3616       break;
3617     case PROP_LOW_PERCENT:
3618       g_value_set_int (value, queue->low_percent);
3619       break;
3620     case PROP_HIGH_PERCENT:
3621       g_value_set_int (value, queue->high_percent);
3622       break;
3623     case PROP_TEMP_TEMPLATE:
3624       g_value_set_string (value, queue->temp_template);
3625       break;
3626     case PROP_TEMP_LOCATION:
3627       g_value_set_string (value, queue->temp_location);
3628       break;
3629     case PROP_TEMP_REMOVE:
3630       g_value_set_boolean (value, queue->temp_remove);
3631       break;
3632     case PROP_RING_BUFFER_MAX_SIZE:
3633       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3634       break;
3635     case PROP_AVG_IN_RATE:
3636     {
3637       gdouble in_rate = queue->byte_in_rate;
3638
3639       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3640        * calculated, so calculate it here. */
3641       if (in_rate == 0.0 && queue->bytes_in
3642           && queue->last_update_in_rates_elapsed > 0.0)
3643         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3644
3645       g_value_set_int64 (value, (gint64) in_rate);
3646       break;
3647     }
3648     default:
3649       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3650       break;
3651   }
3652
3653   GST_QUEUE2_MUTEX_UNLOCK (queue);
3654 }