queue2: Add use-tags-bitrate property
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 #ifdef __BIONIC__               /* Android */
77 #undef lseek
78 #define lseek lseek64
79 #undef off_t
80 #define off_t guint64
81 #include <fcntl.h>
82 #endif
83
84 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
85     GST_PAD_SINK,
86     GST_PAD_ALWAYS,
87     GST_STATIC_CAPS_ANY);
88
89 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
90     GST_PAD_SRC,
91     GST_PAD_ALWAYS,
92     GST_STATIC_CAPS_ANY);
93
94 GST_DEBUG_CATEGORY_STATIC (queue_debug);
95 #define GST_CAT_DEFAULT (queue_debug)
96 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
97
98 enum
99 {
100   SIGNAL_OVERRUN,
101   LAST_SIGNAL
102 };
103
104 /* other defines */
105 #define DEFAULT_BUFFER_SIZE 4096
106 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
107 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
108 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
109
110 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
111
112 /* default property values */
113 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
114 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
115 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
116 #define DEFAULT_USE_BUFFERING      FALSE
117 #define DEFAULT_USE_TAGS_BITRATE   FALSE
118 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
119 #define DEFAULT_LOW_PERCENT        10
120 #define DEFAULT_HIGH_PERCENT       99
121 #define DEFAULT_TEMP_REMOVE        TRUE
122 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
123
124 enum
125 {
126   PROP_0,
127   PROP_CUR_LEVEL_BUFFERS,
128   PROP_CUR_LEVEL_BYTES,
129   PROP_CUR_LEVEL_TIME,
130   PROP_MAX_SIZE_BUFFERS,
131   PROP_MAX_SIZE_BYTES,
132   PROP_MAX_SIZE_TIME,
133   PROP_USE_BUFFERING,
134   PROP_USE_TAGS_BITRATE,
135   PROP_USE_RATE_ESTIMATE,
136   PROP_LOW_PERCENT,
137   PROP_HIGH_PERCENT,
138   PROP_TEMP_TEMPLATE,
139   PROP_TEMP_LOCATION,
140   PROP_TEMP_REMOVE,
141   PROP_RING_BUFFER_MAX_SIZE,
142   PROP_AVG_IN_RATE,
143   PROP_LAST
144 };
145
146 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
147   l.buffers = 0;                                        \
148   l.bytes = 0;                                          \
149   l.time = 0;                                           \
150   l.rate_time = 0;                                      \
151 } G_STMT_END
152
153 #define STATUS(queue, pad, msg) \
154   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
155                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
156                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
157                       " ns, %"G_GUINT64_FORMAT" items", \
158                       GST_DEBUG_PAD_NAME (pad), \
159                       queue->cur_level.buffers, \
160                       queue->max_level.buffers, \
161                       queue->cur_level.bytes, \
162                       queue->max_level.bytes, \
163                       queue->cur_level.time, \
164                       queue->max_level.time, \
165                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
166                         queue->current->writing_pos - queue->current->max_reading_pos : \
167                         queue->queue.length))
168
169 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
170   g_mutex_lock (&q->qlock);                                              \
171 } G_STMT_END
172
173 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
174   GST_QUEUE2_MUTEX_LOCK (q);                                            \
175   if (res != GST_FLOW_OK)                                               \
176     goto label;                                                         \
177 } G_STMT_END
178
179 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
180   g_mutex_unlock (&q->qlock);                                            \
181 } G_STMT_END
182
183 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
184   STATUS (queue, q->sinkpad, "wait for DEL");                           \
185   q->waiting_del = TRUE;                                                \
186   g_cond_wait (&q->item_del, &queue->qlock);                              \
187   q->waiting_del = FALSE;                                               \
188   if (res != GST_FLOW_OK) {                                             \
189     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
190     goto label;                                                         \
191   }                                                                     \
192   STATUS (queue, q->sinkpad, "received DEL");                           \
193 } G_STMT_END
194
195 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
196   STATUS (queue, q->srcpad, "wait for ADD");                            \
197   q->waiting_add = TRUE;                                                \
198   g_cond_wait (&q->item_add, &q->qlock);                                  \
199   q->waiting_add = FALSE;                                               \
200   if (res != GST_FLOW_OK) {                                             \
201     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
202     goto label;                                                         \
203   }                                                                     \
204   STATUS (queue, q->srcpad, "received ADD");                            \
205 } G_STMT_END
206
207 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
208   if (q->waiting_del) {                                                 \
209     STATUS (q, q->srcpad, "signal DEL");                                \
210     g_cond_signal (&q->item_del);                                        \
211   }                                                                     \
212 } G_STMT_END
213
214 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
215   if (q->waiting_add) {                                                 \
216     STATUS (q, q->sinkpad, "signal ADD");                               \
217     g_cond_signal (&q->item_add);                                        \
218   }                                                                     \
219 } G_STMT_END
220
221 #define SET_PERCENT(q, perc) G_STMT_START {                              \
222   if (perc != q->buffering_percent) {                                    \
223     q->buffering_percent = perc;                                         \
224     q->percent_changed = TRUE;                                           \
225     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
226     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
227         &q->buffering_left);                                             \
228   }                                                                      \
229 } G_STMT_END
230
231 #define _do_init \
232     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
233     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
234         "dataflow inside the queue element");
235 #define gst_queue2_parent_class parent_class
236 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
237
238 static void gst_queue2_finalize (GObject * object);
239
240 static void gst_queue2_set_property (GObject * object,
241     guint prop_id, const GValue * value, GParamSpec * pspec);
242 static void gst_queue2_get_property (GObject * object,
243     guint prop_id, GValue * value, GParamSpec * pspec);
244
245 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
246     GstBuffer * buffer);
247 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
248     GstBufferList * buffer_list);
249 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
250 static void gst_queue2_loop (GstPad * pad);
251
252 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
253     GstEvent * event);
254 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
255     GstQuery * query);
256
257 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
258     GstEvent * event);
259 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
260     GstQuery * query);
261 static gboolean gst_queue2_handle_query (GstElement * element,
262     GstQuery * query);
263
264 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
265     guint64 offset, guint length, GstBuffer ** buffer);
266
267 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
268     GstPadMode mode, gboolean active);
269 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
270     GstPadMode mode, gboolean active);
271 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
272     GstStateChange transition);
273
274 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
275 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
276
277 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
278 static void update_in_rates (GstQueue2 * queue);
279 static void gst_queue2_post_buffering (GstQueue2 * queue);
280
281 typedef enum
282 {
283   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
284   GST_QUEUE2_ITEM_TYPE_BUFFER,
285   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
286   GST_QUEUE2_ITEM_TYPE_EVENT,
287   GST_QUEUE2_ITEM_TYPE_QUERY
288 } GstQueue2ItemType;
289
290 typedef struct
291 {
292   GstQueue2ItemType type;
293   GstMiniObject *item;
294 } GstQueue2Item;
295
296 static guint gst_queue2_signals[LAST_SIGNAL] = { 0 };
297
298 static void
299 gst_queue2_class_init (GstQueue2Class * klass)
300 {
301   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
302   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
303
304   gobject_class->set_property = gst_queue2_set_property;
305   gobject_class->get_property = gst_queue2_get_property;
306
307   /* signals */
308   /**
309    * GstQueue2::overrun:
310    * @queue: the queue2 instance
311    *
312    * Reports that the buffer became full (overrun).
313    * A buffer is full if the total amount of data inside it (num-buffers, time,
314    * size) is higher than the boundary values which can be set through the
315    * GObject properties.
316    *
317    * Since: 1.8
318    */
319   gst_queue2_signals[SIGNAL_OVERRUN] =
320       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
321       G_STRUCT_OFFSET (GstQueue2Class, overrun), NULL, NULL,
322       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
323
324   /* properties */
325   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
326       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
327           "Current amount of data in the queue (bytes)",
328           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
329   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
330       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
331           "Current number of buffers in the queue",
332           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
333   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
334       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
335           "Current amount of data in the queue (in ns)",
336           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
337
338   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
339       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
340           "Max. amount of data in the queue (bytes, 0=disable)",
341           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
342           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
343           G_PARAM_STATIC_STRINGS));
344   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
345       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
346           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
347           DEFAULT_MAX_SIZE_BUFFERS,
348           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
349           G_PARAM_STATIC_STRINGS));
350   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
351       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
352           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
353           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
354           G_PARAM_STATIC_STRINGS));
355
356   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
357       g_param_spec_boolean ("use-buffering", "Use buffering",
358           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
359           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
360           G_PARAM_STATIC_STRINGS));
361   g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
362       g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
363           "Use a bitrate from upstream tags to estimate buffer duration if not provided",
364           DEFAULT_USE_TAGS_BITRATE,
365           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
366           G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
368       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
369           "Estimate the bitrate of the stream to calculate time level",
370           DEFAULT_USE_RATE_ESTIMATE,
371           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
373       g_param_spec_int ("low-percent", "Low percent",
374           "Low threshold for buffering to start. Only used if use-buffering is True",
375           0, 100, DEFAULT_LOW_PERCENT,
376           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
377   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
378       g_param_spec_int ("high-percent", "High percent",
379           "High threshold for buffering to finish. Only used if use-buffering is True",
380           0, 100, DEFAULT_HIGH_PERCENT,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382
383   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
384       g_param_spec_string ("temp-template", "Temporary File Template",
385           "File template to store temporary files in, should contain directory "
386           "and XXXXXX. (NULL == disabled)",
387           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388
389   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
390       g_param_spec_string ("temp-location", "Temporary File Location",
391           "Location to store temporary files in (Only read this property, "
392           "use temp-template to configure the name template)",
393           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
394
395   /**
396    * GstQueue2:temp-remove
397    *
398    * When temp-template is set, remove the temporary file when going to READY.
399    */
400   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
401       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
402           "Remove the temp-location after use",
403           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404
405   /**
406    * GstQueue2:ring-buffer-max-size
407    *
408    * The maximum size of the ring buffer in bytes. If set to 0, the ring
409    * buffer is disabled. Default 0.
410    */
411   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
412       g_param_spec_uint64 ("ring-buffer-max-size",
413           "Max. ring buffer size (bytes)",
414           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
415           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
416           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
417
418   /**
419    * GstQueue2:avg-in-rate
420    *
421    * The average input data rate.
422    */
423   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
424       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
425           "Average input data rate (bytes/s)",
426           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
427
428   /* set several parent class virtual functions */
429   gobject_class->finalize = gst_queue2_finalize;
430
431   gst_element_class_add_pad_template (gstelement_class,
432       gst_static_pad_template_get (&srctemplate));
433   gst_element_class_add_pad_template (gstelement_class,
434       gst_static_pad_template_get (&sinktemplate));
435
436   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
437       "Generic",
438       "Simple data queue",
439       "Erik Walthinsen <omega@cse.ogi.edu>, "
440       "Wim Taymans <wim.taymans@gmail.com>");
441
442   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
443   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
444 }
445
446 static void
447 gst_queue2_init (GstQueue2 * queue)
448 {
449   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
450
451   gst_pad_set_chain_function (queue->sinkpad,
452       GST_DEBUG_FUNCPTR (gst_queue2_chain));
453   gst_pad_set_chain_list_function (queue->sinkpad,
454       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
455   gst_pad_set_activatemode_function (queue->sinkpad,
456       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
457   gst_pad_set_event_function (queue->sinkpad,
458       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
459   gst_pad_set_query_function (queue->sinkpad,
460       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
461   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
462   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
463
464   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
465
466   gst_pad_set_activatemode_function (queue->srcpad,
467       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
468   gst_pad_set_getrange_function (queue->srcpad,
469       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
470   gst_pad_set_event_function (queue->srcpad,
471       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
472   gst_pad_set_query_function (queue->srcpad,
473       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
474   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
475   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
476
477   /* levels */
478   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
479   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
480   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
481   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
482   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
483   queue->use_buffering = DEFAULT_USE_BUFFERING;
484   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
485   queue->low_percent = DEFAULT_LOW_PERCENT;
486   queue->high_percent = DEFAULT_HIGH_PERCENT;
487
488   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
489   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
490
491   queue->sinktime = GST_CLOCK_TIME_NONE;
492   queue->srctime = GST_CLOCK_TIME_NONE;
493   queue->sink_tainted = TRUE;
494   queue->src_tainted = TRUE;
495
496   queue->srcresult = GST_FLOW_FLUSHING;
497   queue->sinkresult = GST_FLOW_FLUSHING;
498   queue->is_eos = FALSE;
499   queue->in_timer = g_timer_new ();
500   queue->out_timer = g_timer_new ();
501
502   g_mutex_init (&queue->qlock);
503   queue->waiting_add = FALSE;
504   g_cond_init (&queue->item_add);
505   queue->waiting_del = FALSE;
506   g_cond_init (&queue->item_del);
507   g_queue_init (&queue->queue);
508
509   g_cond_init (&queue->query_handled);
510   queue->last_query = FALSE;
511
512   g_mutex_init (&queue->buffering_post_lock);
513   queue->buffering_percent = 100;
514
515   /* tempfile related */
516   queue->temp_template = NULL;
517   queue->temp_location = NULL;
518   queue->temp_remove = DEFAULT_TEMP_REMOVE;
519
520   queue->ring_buffer = NULL;
521   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
522
523   GST_DEBUG_OBJECT (queue,
524       "initialized queue's not_empty & not_full conditions");
525 }
526
527 /* called only once, as opposed to dispose */
528 static void
529 gst_queue2_finalize (GObject * object)
530 {
531   GstQueue2 *queue = GST_QUEUE2 (object);
532
533   GST_DEBUG_OBJECT (queue, "finalizing queue");
534
535   while (!g_queue_is_empty (&queue->queue)) {
536     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
537
538     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
539       gst_mini_object_unref (qitem->item);
540     g_slice_free (GstQueue2Item, qitem);
541   }
542
543   queue->last_query = FALSE;
544   g_queue_clear (&queue->queue);
545   g_mutex_clear (&queue->qlock);
546   g_mutex_clear (&queue->buffering_post_lock);
547   g_cond_clear (&queue->item_add);
548   g_cond_clear (&queue->item_del);
549   g_cond_clear (&queue->query_handled);
550   g_timer_destroy (queue->in_timer);
551   g_timer_destroy (queue->out_timer);
552
553   /* temp_file path cleanup  */
554   g_free (queue->temp_template);
555   g_free (queue->temp_location);
556
557   G_OBJECT_CLASS (parent_class)->finalize (object);
558 }
559
560 static void
561 debug_ranges (GstQueue2 * queue)
562 {
563   GstQueue2Range *walk;
564
565   for (walk = queue->ranges; walk; walk = walk->next) {
566     GST_DEBUG_OBJECT (queue,
567         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
568         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
569         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
570         walk->rb_writing_pos, walk->reading_pos,
571         walk == queue->current ? "**y**" : "  n  ");
572   }
573 }
574
575 /* clear all the downloaded ranges */
576 static void
577 clean_ranges (GstQueue2 * queue)
578 {
579   GST_DEBUG_OBJECT (queue, "clean queue ranges");
580
581   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
582   queue->ranges = NULL;
583   queue->current = NULL;
584 }
585
586 /* find a range that contains @offset or NULL when nothing does */
587 static GstQueue2Range *
588 find_range (GstQueue2 * queue, guint64 offset)
589 {
590   GstQueue2Range *range = NULL;
591   GstQueue2Range *walk;
592
593   /* first do a quick check for the current range */
594   for (walk = queue->ranges; walk; walk = walk->next) {
595     if (offset >= walk->offset && offset <= walk->writing_pos) {
596       /* we can reuse an existing range */
597       range = walk;
598       break;
599     }
600   }
601   if (range) {
602     GST_DEBUG_OBJECT (queue,
603         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
604         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
605   } else {
606     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
607   }
608   return range;
609 }
610
611 static void
612 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
613 {
614   guint64 max_reading_pos, writing_pos;
615
616   writing_pos = range->writing_pos;
617   max_reading_pos = range->max_reading_pos;
618
619   if (writing_pos > max_reading_pos)
620     queue->cur_level.bytes = writing_pos - max_reading_pos;
621   else
622     queue->cur_level.bytes = 0;
623 }
624
625 /* make a new range for @offset or reuse an existing range */
626 static GstQueue2Range *
627 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
628 {
629   GstQueue2Range *range, *prev, *next;
630
631   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
632
633   if ((range = find_range (queue, offset))) {
634     GST_DEBUG_OBJECT (queue,
635         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
636         range->writing_pos);
637     if (update_existing && range->writing_pos != offset) {
638       GST_DEBUG_OBJECT (queue, "updating range writing position to "
639           "%" G_GUINT64_FORMAT, offset);
640       range->writing_pos = offset;
641     }
642   } else {
643     GST_DEBUG_OBJECT (queue,
644         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
645
646     range = g_slice_new0 (GstQueue2Range);
647     range->offset = offset;
648     /* we want to write to the next location in the ring buffer */
649     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
650     range->writing_pos = offset;
651     range->rb_writing_pos = range->rb_offset;
652     range->reading_pos = offset;
653     range->max_reading_pos = offset;
654
655     /* insert sorted */
656     prev = NULL;
657     next = queue->ranges;
658     while (next) {
659       if (next->offset > offset) {
660         /* insert before next */
661         GST_DEBUG_OBJECT (queue,
662             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
663             next->offset);
664         break;
665       }
666       /* try next */
667       prev = next;
668       next = next->next;
669     }
670     range->next = next;
671     if (prev)
672       prev->next = range;
673     else
674       queue->ranges = range;
675   }
676   debug_ranges (queue);
677
678   /* update the stats for this range */
679   update_cur_level (queue, range);
680
681   return range;
682 }
683
684
685 /* clear and init the download ranges for offset 0 */
686 static void
687 init_ranges (GstQueue2 * queue)
688 {
689   GST_DEBUG_OBJECT (queue, "init queue ranges");
690
691   /* get rid of all the current ranges */
692   clean_ranges (queue);
693   /* make a range for offset 0 */
694   queue->current = add_range (queue, 0, TRUE);
695 }
696
697 /* calculate the diff between running time on the sink and src of the queue.
698  * This is the total amount of time in the queue. */
699 static void
700 update_time_level (GstQueue2 * queue)
701 {
702   if (queue->sink_tainted) {
703     queue->sinktime =
704         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
705         queue->sink_segment.position);
706     queue->sink_tainted = FALSE;
707   }
708
709   if (queue->src_tainted) {
710     queue->srctime =
711         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
712         queue->src_segment.position);
713     queue->src_tainted = FALSE;
714   }
715
716   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
717       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
718
719   if (queue->sinktime != GST_CLOCK_TIME_NONE
720       && queue->srctime != GST_CLOCK_TIME_NONE
721       && queue->sinktime >= queue->srctime)
722     queue->cur_level.time = queue->sinktime - queue->srctime;
723   else
724     queue->cur_level.time = 0;
725 }
726
727 /* take a SEGMENT event and apply the values to segment, updating the time
728  * level of queue. */
729 static void
730 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
731     gboolean is_sink)
732 {
733   gst_event_copy_segment (event, segment);
734
735   if (segment->format == GST_FORMAT_BYTES) {
736     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
737       /* start is where we'll be getting from and as such writing next */
738       queue->current = add_range (queue, segment->start, TRUE);
739     }
740   }
741
742   /* now configure the values, we use these to track timestamps on the
743    * sinkpad. */
744   if (segment->format != GST_FORMAT_TIME) {
745     /* non-time format, pretend the current time segment is closed with a
746      * 0 start and unknown stop time. */
747     segment->format = GST_FORMAT_TIME;
748     segment->start = 0;
749     segment->stop = -1;
750     segment->time = 0;
751   }
752
753   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
754
755   if (is_sink)
756     queue->sink_tainted = TRUE;
757   else
758     queue->src_tainted = TRUE;
759
760   /* segment can update the time level of the queue */
761   update_time_level (queue);
762 }
763
764 static void
765 apply_gap (GstQueue2 * queue, GstEvent * event,
766     GstSegment * segment, gboolean is_sink)
767 {
768   GstClockTime timestamp;
769   GstClockTime duration;
770
771   gst_event_parse_gap (event, &timestamp, &duration);
772
773   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
774
775     if (GST_CLOCK_TIME_IS_VALID (duration)) {
776       timestamp += duration;
777     }
778
779     segment->position = timestamp;
780
781     if (is_sink)
782       queue->sink_tainted = TRUE;
783     else
784       queue->src_tainted = TRUE;
785
786     /* calc diff with other end */
787     update_time_level (queue);
788   }
789 }
790
791 /* take a buffer and update segment, updating the time level of the queue. */
792 static void
793 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
794     guint64 size, gboolean is_sink)
795 {
796   GstClockTime duration, timestamp;
797
798   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
799   duration = GST_BUFFER_DURATION (buffer);
800
801   /* If we have no duration, pick one from the bitrate if we can */
802   if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
803     guint bitrate =
804         is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
805     if (bitrate)
806       duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
807   }
808
809   /* if no timestamp is set, assume it's continuous with the previous
810    * time */
811   if (timestamp == GST_CLOCK_TIME_NONE)
812     timestamp = segment->position;
813
814   /* add duration */
815   if (duration != GST_CLOCK_TIME_NONE)
816     timestamp += duration;
817
818   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
819       GST_TIME_ARGS (timestamp));
820
821   segment->position = timestamp;
822
823   if (is_sink)
824     queue->sink_tainted = TRUE;
825   else
826     queue->src_tainted = TRUE;
827
828   /* calc diff with other end */
829   update_time_level (queue);
830 }
831
832 struct BufListData
833 {
834   GstClockTime timestamp;
835   guint bitrate;
836 };
837
838 static gboolean
839 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
840 {
841   struct BufListData *bld = data;
842   GstClockTime *timestamp = &bld->timestamp;
843   GstClockTime btime;
844
845   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
846       " duration %" GST_TIME_FORMAT, idx,
847       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
848       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
849       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
850
851   btime = GST_BUFFER_DTS_OR_PTS (*buf);
852   if (GST_CLOCK_TIME_IS_VALID (btime))
853     *timestamp = btime;
854
855   if (GST_BUFFER_DURATION_IS_VALID (*buf))
856     *timestamp += GST_BUFFER_DURATION (*buf);
857   else if (bld->bitrate != 0) {
858     guint64 size = gst_buffer_get_size (*buf);
859
860     /* If we have no duration, pick one from the bitrate if we can */
861     *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
862   }
863
864
865   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
866   return TRUE;
867 }
868
869 /* take a buffer list and update segment, updating the time level of the queue */
870 static void
871 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
872     GstSegment * segment, gboolean is_sink)
873 {
874   struct BufListData bld;
875
876   /* if no timestamp is set, assume it's continuous with the previous time */
877   bld.timestamp = segment->position;
878
879   if (queue->use_tags_bitrate) {
880     if (is_sink)
881       bld.bitrate = queue->sink_tags_bitrate;
882     else
883       bld.bitrate = queue->src_tags_bitrate;
884   } else
885     bld.bitrate = 0;
886
887   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
888
889   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
890       GST_TIME_ARGS (bld.timestamp));
891
892   segment->position = bld.timestamp;
893
894   if (is_sink)
895     queue->sink_tainted = TRUE;
896   else
897     queue->src_tainted = TRUE;
898
899   /* calc diff with other end */
900   update_time_level (queue);
901 }
902
903 static inline gint
904 get_percent (guint64 cur_level, guint64 max_level, guint64 alt_max)
905 {
906   guint64 p;
907
908   if (max_level == 0)
909     return 0;
910
911   if (alt_max > 0)
912     p = gst_util_uint64_scale (cur_level, 100, MIN (max_level, alt_max));
913   else
914     p = gst_util_uint64_scale (cur_level, 100, max_level);
915
916   return MIN (p, 100);
917 }
918
919 static gboolean
920 get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
921     gint * percent)
922 {
923   gint perc, perc2;
924
925   if (queue->high_percent <= 0) {
926     if (percent)
927       *percent = 100;
928     if (is_buffering)
929       *is_buffering = FALSE;
930     return FALSE;
931   }
932 #define GET_PERCENT(format,alt_max) \
933     get_percent(queue->cur_level.format,queue->max_level.format,(alt_max))
934
935   if (queue->is_eos) {
936     /* on EOS we are always 100% full, we set the var here so that it we can
937      * reuse the logic below to stop buffering */
938     perc = 100;
939     GST_LOG_OBJECT (queue, "we are EOS");
940   } else {
941     GST_LOG_OBJECT (queue,
942         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
943         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
944         queue->cur_level.buffers);
945
946     /* figure out the percent we are filled, we take the max of all formats. */
947     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
948       perc = GET_PERCENT (bytes, 0);
949     } else {
950       guint64 rb_size = queue->ring_buffer_max_size;
951       perc = GET_PERCENT (bytes, rb_size);
952     }
953
954     perc2 = GET_PERCENT (time, 0);
955     perc = MAX (perc, perc2);
956
957     perc2 = GET_PERCENT (buffers, 0);
958     perc = MAX (perc, perc2);
959
960     /* also apply the rate estimate when we need to */
961     if (queue->use_rate_estimate) {
962       perc2 = GET_PERCENT (rate_time, 0);
963       perc = MAX (perc, perc2);
964     }
965
966     /* Don't get to 0% unless we're really empty */
967     if (queue->cur_level.bytes > 0)
968       perc = MAX (1, perc);
969   }
970 #undef GET_PERCENT
971
972   if (is_buffering)
973     *is_buffering = queue->is_buffering;
974
975   /* scale to high percent so that it becomes the 100% mark */
976   perc = perc * 100 / queue->high_percent;
977   /* clip */
978   if (perc > 100)
979     perc = 100;
980
981   if (percent)
982     *percent = perc;
983
984   GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
985       perc);
986
987   return TRUE;
988 }
989
990 static void
991 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
992     gint * avg_in, gint * avg_out, gint64 * buffering_left)
993 {
994   if (mode) {
995     if (!QUEUE_IS_USING_QUEUE (queue)) {
996       if (QUEUE_IS_USING_RING_BUFFER (queue))
997         *mode = GST_BUFFERING_TIMESHIFT;
998       else
999         *mode = GST_BUFFERING_DOWNLOAD;
1000     } else {
1001       *mode = GST_BUFFERING_STREAM;
1002     }
1003   }
1004
1005   if (avg_in)
1006     *avg_in = queue->byte_in_rate;
1007   if (avg_out)
1008     *avg_out = queue->byte_out_rate;
1009
1010   if (buffering_left) {
1011     *buffering_left = (percent == 100 ? 0 : -1);
1012
1013     if (queue->use_rate_estimate) {
1014       guint64 max, cur;
1015
1016       max = queue->max_level.rate_time;
1017       cur = queue->cur_level.rate_time;
1018
1019       if (percent != 100 && max > cur)
1020         *buffering_left = (max - cur) / 1000000;
1021     }
1022   }
1023 }
1024
1025 static void
1026 gst_queue2_post_buffering (GstQueue2 * queue)
1027 {
1028   GstMessage *msg = NULL;
1029
1030   g_mutex_lock (&queue->buffering_post_lock);
1031   GST_QUEUE2_MUTEX_LOCK (queue);
1032   if (queue->percent_changed) {
1033     gint percent = queue->buffering_percent;
1034
1035     queue->percent_changed = FALSE;
1036
1037     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1038     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1039
1040     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1041         queue->avg_out, queue->buffering_left);
1042   }
1043   GST_QUEUE2_MUTEX_UNLOCK (queue);
1044
1045   if (msg != NULL)
1046     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1047
1048   g_mutex_unlock (&queue->buffering_post_lock);
1049 }
1050
1051 static void
1052 update_buffering (GstQueue2 * queue)
1053 {
1054   gint percent;
1055
1056   /* Ensure the variables used to calculate buffering state are up-to-date. */
1057   if (queue->current)
1058     update_cur_level (queue, queue->current);
1059   update_in_rates (queue);
1060
1061   if (!get_buffering_percent (queue, NULL, &percent))
1062     return;
1063
1064   if (queue->is_buffering) {
1065     /* if we were buffering see if we reached the high watermark */
1066     if (percent >= 100)
1067       queue->is_buffering = FALSE;
1068
1069     SET_PERCENT (queue, percent);
1070   } else {
1071     /* we were not buffering, check if we need to start buffering if we drop
1072      * below the low threshold */
1073     if (percent < queue->low_percent) {
1074       queue->is_buffering = TRUE;
1075       SET_PERCENT (queue, percent);
1076     }
1077   }
1078 }
1079
1080 static void
1081 reset_rate_timer (GstQueue2 * queue)
1082 {
1083   queue->bytes_in = 0;
1084   queue->bytes_out = 0;
1085   queue->byte_in_rate = 0.0;
1086   queue->byte_in_period = 0;
1087   queue->byte_out_rate = 0.0;
1088   queue->last_update_in_rates_elapsed = 0.0;
1089   queue->last_in_elapsed = 0.0;
1090   queue->last_out_elapsed = 0.0;
1091   queue->in_timer_started = FALSE;
1092   queue->out_timer_started = FALSE;
1093 }
1094
1095 /* the interval in seconds to recalculate the rate */
1096 #define RATE_INTERVAL    0.2
1097 /* Tuning for rate estimation. We use a large window for the input rate because
1098  * it should be stable when connected to a network. The output rate is less
1099  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1100  * therefore adapt more quickly.
1101  * However, initial input rate may be subject to a burst, and should therefore
1102  * initially also adapt more quickly to changes, and only later on give higher
1103  * weight to previous values. */
1104 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1105 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1106
1107 static void
1108 update_in_rates (GstQueue2 * queue)
1109 {
1110   gdouble elapsed, period;
1111   gdouble byte_in_rate;
1112
1113   if (!queue->in_timer_started) {
1114     queue->in_timer_started = TRUE;
1115     g_timer_start (queue->in_timer);
1116     return;
1117   }
1118
1119   queue->last_update_in_rates_elapsed = elapsed =
1120       g_timer_elapsed (queue->in_timer, NULL);
1121
1122   /* recalc after each interval. */
1123   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1124     period = elapsed - queue->last_in_elapsed;
1125
1126     GST_DEBUG_OBJECT (queue,
1127         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1128         period, queue->bytes_in, queue->byte_in_period);
1129
1130     byte_in_rate = queue->bytes_in / period;
1131
1132     if (queue->byte_in_rate == 0.0)
1133       queue->byte_in_rate = byte_in_rate;
1134     else
1135       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1136           (double) queue->byte_in_period, period);
1137
1138     /* another data point, cap at 16 for long time running average */
1139     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1140       queue->byte_in_period += period;
1141
1142     /* reset the values to calculate rate over the next interval */
1143     queue->last_in_elapsed = elapsed;
1144     queue->bytes_in = 0;
1145   }
1146
1147   if (queue->byte_in_rate > 0.0) {
1148     queue->cur_level.rate_time =
1149         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1150   }
1151   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1152       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1153 }
1154
1155 static void
1156 update_out_rates (GstQueue2 * queue)
1157 {
1158   gdouble elapsed, period;
1159   gdouble byte_out_rate;
1160
1161   if (!queue->out_timer_started) {
1162     queue->out_timer_started = TRUE;
1163     g_timer_start (queue->out_timer);
1164     return;
1165   }
1166
1167   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1168
1169   /* recalc after each interval. */
1170   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1171     period = elapsed - queue->last_out_elapsed;
1172
1173     GST_DEBUG_OBJECT (queue,
1174         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1175
1176     byte_out_rate = queue->bytes_out / period;
1177
1178     if (queue->byte_out_rate == 0.0)
1179       queue->byte_out_rate = byte_out_rate;
1180     else
1181       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1182
1183     /* reset the values to calculate rate over the next interval */
1184     queue->last_out_elapsed = elapsed;
1185     queue->bytes_out = 0;
1186   }
1187   if (queue->byte_in_rate > 0.0) {
1188     queue->cur_level.rate_time =
1189         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1190   }
1191   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1192       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1193 }
1194
1195 static void
1196 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1197 {
1198   guint64 reading_pos, max_reading_pos;
1199
1200   reading_pos = pos;
1201   max_reading_pos = range->max_reading_pos;
1202
1203   max_reading_pos = MAX (max_reading_pos, reading_pos);
1204
1205   GST_DEBUG_OBJECT (queue,
1206       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1207       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1208   range->max_reading_pos = max_reading_pos;
1209
1210   update_cur_level (queue, range);
1211 }
1212
1213 static gboolean
1214 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1215 {
1216   GstEvent *event;
1217   gboolean res;
1218
1219   /* until we receive the FLUSH_STOP from this seek, we skip data */
1220   queue->seeking = TRUE;
1221   GST_QUEUE2_MUTEX_UNLOCK (queue);
1222
1223   debug_ranges (queue);
1224
1225   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1226
1227   event =
1228       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1229       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1230       GST_SEEK_TYPE_NONE, -1);
1231
1232   res = gst_pad_push_event (queue->sinkpad, event);
1233   GST_QUEUE2_MUTEX_LOCK (queue);
1234
1235   if (res) {
1236     /* Between us sending the seek event and re-acquiring the lock, the source
1237      * thread might already have pushed data and moved along the range's
1238      * writing_pos beyond the seek offset. In that case we don't want to set
1239      * the writing position back to the requested seek position, as it would
1240      * cause data to be written to the wrong offset in the file or ring buffer.
1241      * We still do the add_range call to switch the current range to the
1242      * requested range, or create one if one doesn't exist yet. */
1243     queue->current = add_range (queue, offset, FALSE);
1244   }
1245
1246   return res;
1247 }
1248
1249 /* get the threshold for when we decide to seek rather than wait */
1250 static guint64
1251 get_seek_threshold (GstQueue2 * queue)
1252 {
1253   guint64 threshold;
1254
1255   /* FIXME, find a good threshold based on the incoming rate. */
1256   threshold = 1024 * 512;
1257
1258   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1259     threshold = MIN (threshold,
1260         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1261   }
1262   return threshold;
1263 }
1264
1265 /* see if there is enough data in the file to read a full buffer */
1266 static gboolean
1267 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1268 {
1269   GstQueue2Range *range;
1270
1271   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1272       offset, length);
1273
1274   if ((range = find_range (queue, offset))) {
1275     if (queue->current != range) {
1276       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1277       perform_seek_to_offset (queue, range->writing_pos);
1278     }
1279
1280     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1281         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1282
1283     /* we have a range for offset */
1284     GST_DEBUG_OBJECT (queue,
1285         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1286         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1287
1288     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1289       return TRUE;
1290
1291     if (offset + length <= range->writing_pos)
1292       return TRUE;
1293     else
1294       GST_DEBUG_OBJECT (queue,
1295           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1296           (offset + length) - range->writing_pos);
1297
1298   } else {
1299     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1300         " len %u", offset, length);
1301     /* we don't have the range, see how far away we are */
1302     if (!queue->is_eos && queue->current) {
1303       guint64 threshold = get_seek_threshold (queue);
1304
1305       if (offset >= queue->current->offset && offset <=
1306           queue->current->writing_pos + threshold) {
1307         GST_INFO_OBJECT (queue,
1308             "requested data is within range, wait for data");
1309         return FALSE;
1310       }
1311     }
1312
1313     /* too far away, do a seek */
1314     perform_seek_to_offset (queue, offset);
1315   }
1316
1317   return FALSE;
1318 }
1319
1320 #ifdef HAVE_FSEEKO
1321 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1322 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1323 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1324 #else
1325 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1326 #endif
1327
1328 static GstFlowReturn
1329 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1330     guint8 * dst, gint64 * read_return)
1331 {
1332   guint8 *ring_buffer;
1333   size_t res;
1334
1335   ring_buffer = queue->ring_buffer;
1336
1337   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1338     goto seek_failed;
1339
1340   /* this should not block */
1341   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1342       length, offset);
1343   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1344     res = fread (dst, 1, length, queue->temp_file);
1345   } else {
1346     memcpy (dst, ring_buffer + offset, length);
1347     res = length;
1348   }
1349
1350   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1351
1352   if (G_UNLIKELY (res < length)) {
1353     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1354       goto could_not_read;
1355     /* check for errors or EOF */
1356     if (ferror (queue->temp_file))
1357       goto could_not_read;
1358     if (feof (queue->temp_file) && length > 0)
1359       goto eos;
1360   }
1361
1362   *read_return = res;
1363
1364   return GST_FLOW_OK;
1365
1366 seek_failed:
1367   {
1368     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1369     return GST_FLOW_ERROR;
1370   }
1371 could_not_read:
1372   {
1373     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1374     return GST_FLOW_ERROR;
1375   }
1376 eos:
1377   {
1378     GST_DEBUG ("non-regular file hits EOS");
1379     return GST_FLOW_EOS;
1380   }
1381 }
1382
1383 static GstFlowReturn
1384 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1385     GstBuffer ** buffer)
1386 {
1387   GstBuffer *buf;
1388   GstMapInfo info;
1389   guint8 *data;
1390   guint64 file_offset;
1391   guint block_length, remaining, read_length;
1392   guint64 rb_size;
1393   guint64 max_size;
1394   guint64 rpos;
1395   GstFlowReturn ret = GST_FLOW_OK;
1396
1397   /* allocate the output buffer of the requested size */
1398   if (*buffer == NULL)
1399     buf = gst_buffer_new_allocate (NULL, length, NULL);
1400   else
1401     buf = *buffer;
1402
1403   gst_buffer_map (buf, &info, GST_MAP_WRITE);
1404   data = info.data;
1405
1406   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1407       offset);
1408
1409   rpos = offset;
1410   rb_size = queue->ring_buffer_max_size;
1411   max_size = QUEUE_MAX_BYTES (queue);
1412
1413   remaining = length;
1414   while (remaining > 0) {
1415     /* configure how much/whether to read */
1416     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1417       read_length = 0;
1418
1419       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1420         guint64 level;
1421
1422         /* calculate how far away the offset is */
1423         if (queue->current->writing_pos > rpos)
1424           level = queue->current->writing_pos - rpos;
1425         else
1426           level = 0;
1427
1428         GST_DEBUG_OBJECT (queue,
1429             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1430             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1431             rpos, queue->current->writing_pos, level, max_size);
1432
1433         if (level >= max_size) {
1434           /* we don't have the data but if we have a ring buffer that is full, we
1435            * need to read */
1436           GST_DEBUG_OBJECT (queue,
1437               "ring buffer full, reading QUEUE_MAX_BYTES %"
1438               G_GUINT64_FORMAT " bytes", max_size);
1439           read_length = max_size;
1440         } else if (queue->is_eos) {
1441           /* won't get any more data so read any data we have */
1442           if (level) {
1443             GST_DEBUG_OBJECT (queue,
1444                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1445                 level);
1446             read_length = level;
1447             remaining = level;
1448             length = level;
1449           } else
1450             goto hit_eos;
1451         }
1452       }
1453
1454       if (read_length == 0) {
1455         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1456           GST_DEBUG_OBJECT (queue,
1457               "update current position [%" G_GUINT64_FORMAT "-%"
1458               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1459           update_cur_pos (queue, queue->current, rpos);
1460           GST_QUEUE2_SIGNAL_DEL (queue);
1461         }
1462
1463         if (queue->use_buffering)
1464           update_buffering (queue);
1465
1466         GST_DEBUG_OBJECT (queue, "waiting for add");
1467         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1468         continue;
1469       }
1470     } else {
1471       /* we have the requested data so read it */
1472       read_length = remaining;
1473     }
1474
1475     /* set range reading_pos to actual reading position for this read */
1476     queue->current->reading_pos = rpos;
1477
1478     /* configure how much and from where to read */
1479     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1480       file_offset =
1481           (queue->current->rb_offset + (rpos -
1482               queue->current->offset)) % rb_size;
1483       if (file_offset + read_length > rb_size) {
1484         block_length = rb_size - file_offset;
1485       } else {
1486         block_length = read_length;
1487       }
1488     } else {
1489       file_offset = rpos;
1490       block_length = read_length;
1491     }
1492
1493     /* while we still have data to read, we loop */
1494     while (read_length > 0) {
1495       gint64 read_return;
1496
1497       ret =
1498           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1499           data, &read_return);
1500       if (ret != GST_FLOW_OK)
1501         goto read_error;
1502
1503       file_offset += read_return;
1504       if (QUEUE_IS_USING_RING_BUFFER (queue))
1505         file_offset %= rb_size;
1506
1507       data += read_return;
1508       read_length -= read_return;
1509       block_length = read_length;
1510       remaining -= read_return;
1511
1512       rpos = (queue->current->reading_pos += read_return);
1513       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1514     }
1515     GST_QUEUE2_SIGNAL_DEL (queue);
1516     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1517   }
1518
1519   gst_buffer_unmap (buf, &info);
1520   gst_buffer_resize (buf, 0, length);
1521
1522   GST_BUFFER_OFFSET (buf) = offset;
1523   GST_BUFFER_OFFSET_END (buf) = offset + length;
1524
1525   *buffer = buf;
1526
1527   return ret;
1528
1529   /* ERRORS */
1530 hit_eos:
1531   {
1532     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1533     gst_buffer_unmap (buf, &info);
1534     if (*buffer == NULL)
1535       gst_buffer_unref (buf);
1536     return GST_FLOW_EOS;
1537   }
1538 out_flushing:
1539   {
1540     GST_DEBUG_OBJECT (queue, "we are flushing");
1541     gst_buffer_unmap (buf, &info);
1542     if (*buffer == NULL)
1543       gst_buffer_unref (buf);
1544     return GST_FLOW_FLUSHING;
1545   }
1546 read_error:
1547   {
1548     GST_DEBUG_OBJECT (queue, "we have a read error");
1549     gst_buffer_unmap (buf, &info);
1550     if (*buffer == NULL)
1551       gst_buffer_unref (buf);
1552     return ret;
1553   }
1554 }
1555
1556 /* should be called with QUEUE_LOCK */
1557 static GstMiniObject *
1558 gst_queue2_read_item_from_file (GstQueue2 * queue)
1559 {
1560   GstMiniObject *item;
1561
1562   if (queue->stream_start_event != NULL) {
1563     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1564     queue->stream_start_event = NULL;
1565   } else if (queue->starting_segment != NULL) {
1566     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1567     queue->starting_segment = NULL;
1568   } else {
1569     GstFlowReturn ret;
1570     GstBuffer *buffer = NULL;
1571     guint64 reading_pos;
1572
1573     reading_pos = queue->current->reading_pos;
1574
1575     ret =
1576         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1577         &buffer);
1578
1579     switch (ret) {
1580       case GST_FLOW_OK:
1581         item = GST_MINI_OBJECT_CAST (buffer);
1582         break;
1583       case GST_FLOW_EOS:
1584         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1585         break;
1586       default:
1587         item = NULL;
1588         break;
1589     }
1590   }
1591   return item;
1592 }
1593
1594 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1595  * the temp filename. */
1596 static gboolean
1597 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1598 {
1599   gint fd = -1;
1600   gchar *name = NULL;
1601
1602   if (queue->temp_file)
1603     goto already_opened;
1604
1605   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1606
1607   /* If temp_template was set, allocate a filename and open that file */
1608
1609   /* nothing to do */
1610   if (queue->temp_template == NULL)
1611     goto no_directory;
1612
1613   /* make copy of the template, we don't want to change this */
1614   name = g_strdup (queue->temp_template);
1615
1616 #ifdef __BIONIC__
1617   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1618 #else
1619   fd = g_mkstemp (name);
1620 #endif
1621
1622   if (fd == -1)
1623     goto mkstemp_failed;
1624
1625   /* open the file for update/writing */
1626   queue->temp_file = fdopen (fd, "wb+");
1627   /* error creating file */
1628   if (queue->temp_file == NULL)
1629     goto open_failed;
1630
1631   g_free (queue->temp_location);
1632   queue->temp_location = name;
1633
1634   GST_QUEUE2_MUTEX_UNLOCK (queue);
1635
1636   /* we can't emit the notify with the lock */
1637   g_object_notify (G_OBJECT (queue), "temp-location");
1638
1639   GST_QUEUE2_MUTEX_LOCK (queue);
1640
1641   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1642
1643   return TRUE;
1644
1645   /* ERRORS */
1646 already_opened:
1647   {
1648     GST_DEBUG_OBJECT (queue, "temp file was already open");
1649     return TRUE;
1650   }
1651 no_directory:
1652   {
1653     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1654         (_("No Temp directory specified.")), (NULL));
1655     return FALSE;
1656   }
1657 mkstemp_failed:
1658   {
1659     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1660         (_("Could not create temp file \"%s\"."), queue->temp_template),
1661         GST_ERROR_SYSTEM);
1662     g_free (name);
1663     return FALSE;
1664   }
1665 open_failed:
1666   {
1667     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1668         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1669     g_free (name);
1670     if (fd != -1)
1671       close (fd);
1672     return FALSE;
1673   }
1674 }
1675
1676 static void
1677 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1678 {
1679   /* nothing to do */
1680   if (queue->temp_file == NULL)
1681     return;
1682
1683   GST_DEBUG_OBJECT (queue, "closing temp file");
1684
1685   fflush (queue->temp_file);
1686   fclose (queue->temp_file);
1687
1688   if (queue->temp_remove) {
1689     if (remove (queue->temp_location) < 0) {
1690       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1691           queue->temp_location, g_strerror (errno));
1692     }
1693   }
1694
1695   queue->temp_file = NULL;
1696   clean_ranges (queue);
1697 }
1698
1699 static void
1700 gst_queue2_flush_temp_file (GstQueue2 * queue)
1701 {
1702   if (queue->temp_file == NULL)
1703     return;
1704
1705   GST_DEBUG_OBJECT (queue, "flushing temp file");
1706
1707   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1708 }
1709
1710 static void
1711 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1712 {
1713   if (!QUEUE_IS_USING_QUEUE (queue)) {
1714     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1715       gst_queue2_flush_temp_file (queue);
1716     init_ranges (queue);
1717   } else {
1718     while (!g_queue_is_empty (&queue->queue)) {
1719       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1720
1721       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1722           && GST_EVENT_IS_STICKY (qitem->item)
1723           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1724           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1725         gst_pad_store_sticky_event (queue->srcpad,
1726             GST_EVENT_CAST (qitem->item));
1727       }
1728
1729       /* Then lose another reference because we are supposed to destroy that
1730          data when flushing */
1731       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1732         gst_mini_object_unref (qitem->item);
1733       g_slice_free (GstQueue2Item, qitem);
1734     }
1735   }
1736   queue->last_query = FALSE;
1737   g_cond_signal (&queue->query_handled);
1738   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1739   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1740   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1741   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1742   queue->sink_tainted = queue->src_tainted = TRUE;
1743   if (queue->starting_segment != NULL)
1744     gst_event_unref (queue->starting_segment);
1745   queue->starting_segment = NULL;
1746   queue->segment_event_received = FALSE;
1747   gst_event_replace (&queue->stream_start_event, NULL);
1748
1749   /* we deleted a lot of something */
1750   GST_QUEUE2_SIGNAL_DEL (queue);
1751 }
1752
1753 static gboolean
1754 gst_queue2_wait_free_space (GstQueue2 * queue)
1755 {
1756   /* We make space available if we're "full" according to whatever
1757    * the user defined as "full". */
1758   if (gst_queue2_is_filled (queue)) {
1759     gboolean started;
1760
1761     /* pause the timer while we wait. The fact that we are waiting does not mean
1762      * the byterate on the input pad is lower */
1763     if ((started = queue->in_timer_started))
1764       g_timer_stop (queue->in_timer);
1765
1766     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1767         "queue is full, waiting for free space");
1768     do {
1769       GST_QUEUE2_MUTEX_UNLOCK (queue);
1770       g_signal_emit (queue, gst_queue2_signals[SIGNAL_OVERRUN], 0);
1771       GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
1772       /* we recheck, the signal could have changed the thresholds */
1773       if (!gst_queue2_is_filled (queue))
1774         break;
1775
1776       /* Wait for space to be available, we could be unlocked because of a flush. */
1777       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1778     }
1779     while (gst_queue2_is_filled (queue));
1780
1781     /* and continue if we were running before */
1782     if (started)
1783       g_timer_continue (queue->in_timer);
1784   }
1785   return TRUE;
1786
1787   /* ERRORS */
1788 out_flushing:
1789   {
1790     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1791     return FALSE;
1792   }
1793 }
1794
1795 static gboolean
1796 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1797 {
1798   GstMapInfo info;
1799   guint8 *data, *ring_buffer;
1800   guint size, rb_size;
1801   guint64 writing_pos, new_writing_pos;
1802   GstQueue2Range *range, *prev, *next;
1803   gboolean do_seek = FALSE;
1804
1805   if (QUEUE_IS_USING_RING_BUFFER (queue))
1806     writing_pos = queue->current->rb_writing_pos;
1807   else
1808     writing_pos = queue->current->writing_pos;
1809   ring_buffer = queue->ring_buffer;
1810   rb_size = queue->ring_buffer_max_size;
1811
1812   gst_buffer_map (buffer, &info, GST_MAP_READ);
1813
1814   size = info.size;
1815   data = info.data;
1816
1817   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1818       writing_pos);
1819
1820   /* sanity check */
1821   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1822       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1823     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1824         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1825         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1826   }
1827
1828   while (size > 0) {
1829     guint to_write;
1830
1831     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1832       gint64 space;
1833
1834       /* calculate the space in the ring buffer not used by data from
1835        * the current range */
1836       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1837         /* wait until there is some free space */
1838         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1839       }
1840       /* get the amount of space we have */
1841       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1842
1843       /* calculate if we need to split or if we can write the entire
1844        * buffer now */
1845       to_write = MIN (size, space);
1846
1847       /* the writing position in the ring buffer after writing (part
1848        * or all of) the buffer */
1849       new_writing_pos = (writing_pos + to_write) % rb_size;
1850
1851       prev = NULL;
1852       range = queue->ranges;
1853
1854       /* if we need to overwrite data in the ring buffer, we need to
1855        * update the ranges
1856        *
1857        * warning: this code is complicated and includes some
1858        * simplifications - pen, paper and diagrams for the cases
1859        * recommended! */
1860       while (range) {
1861         guint64 range_data_start, range_data_end;
1862         GstQueue2Range *range_to_destroy = NULL;
1863
1864         range_data_start = range->rb_offset;
1865         range_data_end = range->rb_writing_pos;
1866
1867         /* handle the special case where the range has no data in it */
1868         if (range->writing_pos == range->offset) {
1869           if (range != queue->current) {
1870             GST_DEBUG_OBJECT (queue,
1871                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1872                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1873             /* remove range */
1874             range_to_destroy = range;
1875             if (prev)
1876               prev->next = range->next;
1877           }
1878           goto next_range;
1879         }
1880
1881         if (range_data_end > range_data_start) {
1882           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1883             goto next_range;
1884
1885           if (new_writing_pos > range_data_start) {
1886             if (new_writing_pos >= range_data_end) {
1887               GST_DEBUG_OBJECT (queue,
1888                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1889                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1890               /* remove range */
1891               range_to_destroy = range;
1892               if (prev)
1893                 prev->next = range->next;
1894             } else {
1895               GST_DEBUG_OBJECT (queue,
1896                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1897                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1898                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1899                   range->offset + new_writing_pos - range_data_start,
1900                   new_writing_pos);
1901               range->offset += (new_writing_pos - range_data_start);
1902               range->rb_offset = new_writing_pos;
1903             }
1904           }
1905         } else {
1906           guint64 new_wpos_virt = writing_pos + to_write;
1907
1908           if (new_wpos_virt <= range_data_start)
1909             goto next_range;
1910
1911           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1912             GST_DEBUG_OBJECT (queue,
1913                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1914                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1915             /* remove range */
1916             range_to_destroy = range;
1917             if (prev)
1918               prev->next = range->next;
1919           } else {
1920             GST_DEBUG_OBJECT (queue,
1921                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1922                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1923                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1924                 range->offset + new_writing_pos - range_data_start,
1925                 new_writing_pos);
1926             range->offset += (new_wpos_virt - range_data_start);
1927             range->rb_offset = new_writing_pos;
1928           }
1929         }
1930
1931       next_range:
1932         if (!range_to_destroy)
1933           prev = range;
1934
1935         range = range->next;
1936         if (range_to_destroy) {
1937           if (range_to_destroy == queue->ranges)
1938             queue->ranges = range;
1939           g_slice_free (GstQueue2Range, range_to_destroy);
1940           range_to_destroy = NULL;
1941         }
1942       }
1943     } else {
1944       to_write = size;
1945       new_writing_pos = writing_pos + to_write;
1946     }
1947
1948     if (QUEUE_IS_USING_TEMP_FILE (queue)
1949         && FSEEK_FILE (queue->temp_file, writing_pos))
1950       goto seek_failed;
1951
1952     if (new_writing_pos > writing_pos) {
1953       GST_INFO_OBJECT (queue,
1954           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1955           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1956           queue->current->writing_pos, queue->current->rb_writing_pos);
1957       /* either not using ring buffer or no wrapping, just write */
1958       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1959         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1960           goto handle_error;
1961       } else {
1962         memcpy (ring_buffer + writing_pos, data, to_write);
1963       }
1964
1965       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1966         /* try to merge with next range */
1967         while ((next = queue->current->next)) {
1968           GST_INFO_OBJECT (queue,
1969               "checking merge with next range %" G_GUINT64_FORMAT " < %"
1970               G_GUINT64_FORMAT, new_writing_pos, next->offset);
1971           if (new_writing_pos < next->offset)
1972             break;
1973
1974           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1975               next->writing_pos);
1976
1977           /* remove the group */
1978           queue->current->next = next->next;
1979
1980           /* We use the threshold to decide if we want to do a seek or simply
1981            * read the data again. If there is not so much data in the range we
1982            * prefer to avoid to seek and read it again. */
1983           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
1984             /* the new range had more data than the threshold, it's worth keeping
1985              * it and doing a seek. */
1986             new_writing_pos = next->writing_pos;
1987             do_seek = TRUE;
1988           }
1989           g_slice_free (GstQueue2Range, next);
1990         }
1991         goto update_and_signal;
1992       }
1993     } else {
1994       /* wrapping */
1995       guint block_one, block_two;
1996
1997       block_one = rb_size - writing_pos;
1998       block_two = to_write - block_one;
1999
2000       if (block_one > 0) {
2001         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2002         /* write data to end of ring buffer */
2003         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2004           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2005             goto handle_error;
2006         } else {
2007           memcpy (ring_buffer + writing_pos, data, block_one);
2008         }
2009       }
2010
2011       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2012         goto seek_failed;
2013
2014       if (block_two > 0) {
2015         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2016         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2017           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2018             goto handle_error;
2019         } else {
2020           memcpy (ring_buffer, data + block_one, block_two);
2021         }
2022       }
2023     }
2024
2025   update_and_signal:
2026     /* update the writing positions */
2027     size -= to_write;
2028     GST_INFO_OBJECT (queue,
2029         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2030         to_write, writing_pos, size);
2031
2032     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2033       data += to_write;
2034       queue->current->writing_pos += to_write;
2035       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2036     } else {
2037       queue->current->writing_pos = writing_pos = new_writing_pos;
2038     }
2039     if (do_seek)
2040       perform_seek_to_offset (queue, new_writing_pos);
2041
2042     update_cur_level (queue, queue->current);
2043
2044     /* update the buffering status */
2045     if (queue->use_buffering)
2046       update_buffering (queue);
2047
2048     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2049         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2050
2051     GST_QUEUE2_SIGNAL_ADD (queue);
2052   }
2053
2054   gst_buffer_unmap (buffer, &info);
2055
2056   return TRUE;
2057
2058   /* ERRORS */
2059 out_flushing:
2060   {
2061     GST_DEBUG_OBJECT (queue, "we are flushing");
2062     gst_buffer_unmap (buffer, &info);
2063     /* FIXME - GST_FLOW_EOS ? */
2064     return FALSE;
2065   }
2066 seek_failed:
2067   {
2068     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2069     gst_buffer_unmap (buffer, &info);
2070     return FALSE;
2071   }
2072 handle_error:
2073   {
2074     switch (errno) {
2075       case ENOSPC:{
2076         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2077         break;
2078       }
2079       default:{
2080         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2081             (_("Error while writing to download file.")),
2082             ("%s", g_strerror (errno)));
2083       }
2084     }
2085     gst_buffer_unmap (buffer, &info);
2086     return FALSE;
2087   }
2088 }
2089
2090 static gboolean
2091 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2092 {
2093   GstQueue2 *queue = q;
2094
2095   GST_TRACE_OBJECT (queue,
2096       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2097       gst_buffer_get_size (*buf));
2098
2099   if (!gst_queue2_create_write (queue, *buf)) {
2100     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2101     return FALSE;
2102   }
2103   return TRUE;
2104 }
2105
2106 static gboolean
2107 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2108 {
2109   guint *p_size = data;
2110   gsize buf_size;
2111
2112   buf_size = gst_buffer_get_size (*buf);
2113   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2114   *p_size += buf_size;
2115   return TRUE;
2116 }
2117
2118 /* enqueue an item an update the level stats */
2119 static void
2120 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2121     GstQueue2ItemType item_type)
2122 {
2123   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2124     GstBuffer *buffer;
2125     guint size;
2126
2127     buffer = GST_BUFFER_CAST (item);
2128     size = gst_buffer_get_size (buffer);
2129
2130     /* add buffer to the statistics */
2131     if (QUEUE_IS_USING_QUEUE (queue)) {
2132       queue->cur_level.buffers++;
2133       queue->cur_level.bytes += size;
2134     }
2135     queue->bytes_in += size;
2136
2137     /* apply new buffer to segment stats */
2138     apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2139     /* update the byterate stats */
2140     update_in_rates (queue);
2141
2142     if (!QUEUE_IS_USING_QUEUE (queue)) {
2143       /* FIXME - check return value? */
2144       gst_queue2_create_write (queue, buffer);
2145     }
2146   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2147     GstBufferList *buffer_list;
2148     guint size = 0;
2149
2150     buffer_list = GST_BUFFER_LIST_CAST (item);
2151
2152     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2153     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2154
2155     /* add buffer to the statistics */
2156     if (QUEUE_IS_USING_QUEUE (queue)) {
2157       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2158       queue->cur_level.bytes += size;
2159     }
2160     queue->bytes_in += size;
2161
2162     /* apply new buffer to segment stats */
2163     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2164
2165     /* update the byterate stats */
2166     update_in_rates (queue);
2167
2168     if (!QUEUE_IS_USING_QUEUE (queue)) {
2169       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2170     }
2171   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2172     GstEvent *event;
2173
2174     event = GST_EVENT_CAST (item);
2175
2176     switch (GST_EVENT_TYPE (event)) {
2177       case GST_EVENT_EOS:
2178         /* Zero the thresholds, this makes sure the queue is completely
2179          * filled and we can read all data from the queue. */
2180         GST_DEBUG_OBJECT (queue, "we have EOS");
2181         queue->is_eos = TRUE;
2182         break;
2183       case GST_EVENT_SEGMENT:
2184         apply_segment (queue, event, &queue->sink_segment, TRUE);
2185         /* This is our first new segment, we hold it
2186          * as we can't save it on the temp file */
2187         if (!QUEUE_IS_USING_QUEUE (queue)) {
2188           if (queue->segment_event_received)
2189             goto unexpected_event;
2190
2191           queue->segment_event_received = TRUE;
2192           if (queue->starting_segment != NULL)
2193             gst_event_unref (queue->starting_segment);
2194           queue->starting_segment = event;
2195           item = NULL;
2196         }
2197         /* a new segment allows us to accept more buffers if we got EOS
2198          * from downstream */
2199         queue->unexpected = FALSE;
2200         break;
2201       case GST_EVENT_GAP:
2202         apply_gap (queue, event, &queue->sink_segment, TRUE);
2203         break;
2204       case GST_EVENT_STREAM_START:
2205         if (!QUEUE_IS_USING_QUEUE (queue)) {
2206           gst_event_replace (&queue->stream_start_event, event);
2207           gst_event_unref (event);
2208           item = NULL;
2209         }
2210         break;
2211       case GST_EVENT_CAPS:{
2212         GstCaps *caps;
2213
2214         gst_event_parse_caps (event, &caps);
2215         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2216
2217         if (!QUEUE_IS_USING_QUEUE (queue)) {
2218           GST_LOG ("Dropping caps event, not using queue");
2219           gst_event_unref (event);
2220           item = NULL;
2221         }
2222         break;
2223       }
2224       default:
2225         if (!QUEUE_IS_USING_QUEUE (queue))
2226           goto unexpected_event;
2227         break;
2228     }
2229   } else if (GST_IS_QUERY (item)) {
2230     /* Can't happen as we check that in the caller */
2231     if (!QUEUE_IS_USING_QUEUE (queue))
2232       g_assert_not_reached ();
2233   } else {
2234     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2235         item, GST_OBJECT_NAME (queue));
2236     /* we can't really unref since we don't know what it is */
2237     item = NULL;
2238   }
2239
2240   if (item) {
2241     /* update the buffering status */
2242     if (queue->use_buffering)
2243       update_buffering (queue);
2244
2245     if (QUEUE_IS_USING_QUEUE (queue)) {
2246       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2247       qitem->type = item_type;
2248       qitem->item = item;
2249       g_queue_push_tail (&queue->queue, qitem);
2250     } else {
2251       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2252     }
2253
2254     GST_QUEUE2_SIGNAL_ADD (queue);
2255   }
2256
2257   return;
2258
2259   /* ERRORS */
2260 unexpected_event:
2261   {
2262     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2263
2264     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2265         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2266         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2267     gst_event_unref (GST_EVENT_CAST (item));
2268     return;
2269   }
2270 }
2271
2272 /* dequeue an item from the queue and update level stats */
2273 static GstMiniObject *
2274 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2275 {
2276   GstMiniObject *item;
2277
2278   if (!QUEUE_IS_USING_QUEUE (queue)) {
2279     item = gst_queue2_read_item_from_file (queue);
2280   } else {
2281     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2282
2283     if (qitem == NULL)
2284       goto no_item;
2285
2286     item = qitem->item;
2287     g_slice_free (GstQueue2Item, qitem);
2288   }
2289
2290   if (item == NULL)
2291     goto no_item;
2292
2293   if (GST_IS_BUFFER (item)) {
2294     GstBuffer *buffer;
2295     guint size;
2296
2297     buffer = GST_BUFFER_CAST (item);
2298     size = gst_buffer_get_size (buffer);
2299     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2300
2301     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2302         "retrieved buffer %p from queue", buffer);
2303
2304     if (QUEUE_IS_USING_QUEUE (queue)) {
2305       queue->cur_level.buffers--;
2306       queue->cur_level.bytes -= size;
2307     }
2308     queue->bytes_out += size;
2309
2310     apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2311     /* update the byterate stats */
2312     update_out_rates (queue);
2313     /* update the buffering */
2314     if (queue->use_buffering)
2315       update_buffering (queue);
2316
2317   } else if (GST_IS_EVENT (item)) {
2318     GstEvent *event = GST_EVENT_CAST (item);
2319
2320     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2321
2322     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2323         "retrieved event %p from queue", event);
2324
2325     switch (GST_EVENT_TYPE (event)) {
2326       case GST_EVENT_EOS:
2327         /* queue is empty now that we dequeued the EOS */
2328         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2329         break;
2330       case GST_EVENT_SEGMENT:
2331         apply_segment (queue, event, &queue->src_segment, FALSE);
2332         break;
2333       case GST_EVENT_GAP:
2334         apply_gap (queue, event, &queue->src_segment, FALSE);
2335         break;
2336       default:
2337         break;
2338     }
2339   } else if (GST_IS_BUFFER_LIST (item)) {
2340     GstBufferList *buffer_list;
2341     guint size = 0;
2342
2343     buffer_list = GST_BUFFER_LIST_CAST (item);
2344     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2345     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2346
2347     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2348         "retrieved buffer list %p from queue", buffer_list);
2349
2350     if (QUEUE_IS_USING_QUEUE (queue)) {
2351       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2352       queue->cur_level.bytes -= size;
2353     }
2354     queue->bytes_out += size;
2355
2356     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2357     /* update the byterate stats */
2358     update_out_rates (queue);
2359     /* update the buffering */
2360     if (queue->use_buffering)
2361       update_buffering (queue);
2362   } else if (GST_IS_QUERY (item)) {
2363     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2364         "retrieved query %p from queue", item);
2365     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2366   } else {
2367     g_warning
2368         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2369         item, GST_OBJECT_NAME (queue));
2370     item = NULL;
2371     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2372   }
2373   GST_QUEUE2_SIGNAL_DEL (queue);
2374
2375   return item;
2376
2377   /* ERRORS */
2378 no_item:
2379   {
2380     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2381     return NULL;
2382   }
2383 }
2384
2385 static gboolean
2386 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2387     GstEvent * event)
2388 {
2389   gboolean ret = TRUE;
2390   GstQueue2 *queue;
2391
2392   queue = GST_QUEUE2 (parent);
2393
2394   switch (GST_EVENT_TYPE (event)) {
2395     case GST_EVENT_FLUSH_START:
2396     {
2397       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2398       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2399         /* forward event */
2400         ret = gst_pad_push_event (queue->srcpad, event);
2401
2402         /* now unblock the chain function */
2403         GST_QUEUE2_MUTEX_LOCK (queue);
2404         queue->srcresult = GST_FLOW_FLUSHING;
2405         queue->sinkresult = GST_FLOW_FLUSHING;
2406         /* unblock the loop and chain functions */
2407         GST_QUEUE2_SIGNAL_ADD (queue);
2408         GST_QUEUE2_SIGNAL_DEL (queue);
2409         queue->last_query = FALSE;
2410         g_cond_signal (&queue->query_handled);
2411         GST_QUEUE2_MUTEX_UNLOCK (queue);
2412
2413         /* make sure it pauses, this should happen since we sent
2414          * flush_start downstream. */
2415         gst_pad_pause_task (queue->srcpad);
2416         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2417       } else {
2418         GST_QUEUE2_MUTEX_LOCK (queue);
2419         /* flush the sink pad */
2420         queue->sinkresult = GST_FLOW_FLUSHING;
2421         GST_QUEUE2_SIGNAL_DEL (queue);
2422         queue->last_query = FALSE;
2423         g_cond_signal (&queue->query_handled);
2424         GST_QUEUE2_MUTEX_UNLOCK (queue);
2425
2426         gst_event_unref (event);
2427       }
2428       break;
2429     }
2430     case GST_EVENT_FLUSH_STOP:
2431     {
2432       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2433
2434       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2435         /* forward event */
2436         ret = gst_pad_push_event (queue->srcpad, event);
2437
2438         GST_QUEUE2_MUTEX_LOCK (queue);
2439         gst_queue2_locked_flush (queue, FALSE, TRUE);
2440         queue->srcresult = GST_FLOW_OK;
2441         queue->sinkresult = GST_FLOW_OK;
2442         queue->is_eos = FALSE;
2443         queue->unexpected = FALSE;
2444         queue->seeking = FALSE;
2445         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2446         /* reset rate counters */
2447         reset_rate_timer (queue);
2448         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2449             queue->srcpad, NULL);
2450         GST_QUEUE2_MUTEX_UNLOCK (queue);
2451       } else {
2452         GST_QUEUE2_MUTEX_LOCK (queue);
2453         queue->segment_event_received = FALSE;
2454         queue->is_eos = FALSE;
2455         queue->unexpected = FALSE;
2456         queue->sinkresult = GST_FLOW_OK;
2457         queue->seeking = FALSE;
2458         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2459         GST_QUEUE2_MUTEX_UNLOCK (queue);
2460
2461         gst_event_unref (event);
2462       }
2463       break;
2464     }
2465     case GST_EVENT_TAG:{
2466       if (queue->use_tags_bitrate) {
2467         GstTagList *tags;
2468         guint bitrate;
2469
2470         gst_event_parse_tag (event, &tags);
2471         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2472             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2473           GST_QUEUE2_MUTEX_LOCK (queue);
2474           queue->sink_tags_bitrate = bitrate;
2475           GST_QUEUE2_MUTEX_UNLOCK (queue);
2476           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2477         }
2478       }
2479       /* Fall-through */
2480     }
2481     default:
2482       if (GST_EVENT_IS_SERIALIZED (event)) {
2483         /* serialized events go in the queue */
2484         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2485         if (queue->srcresult != GST_FLOW_OK) {
2486           /* Errors in sticky event pushing are no problem and ignored here
2487            * as they will cause more meaningful errors during data flow.
2488            * For EOS events, that are not followed by data flow, we still
2489            * return FALSE here though and report an error.
2490            */
2491           if (!GST_EVENT_IS_STICKY (event)) {
2492             goto out_flow_error;
2493           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2494             if (queue->srcresult == GST_FLOW_NOT_LINKED
2495                 || queue->srcresult < GST_FLOW_EOS) {
2496               GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2497                   (_("Internal data flow error.")),
2498                   ("streaming task paused, reason %s (%d)",
2499                       gst_flow_get_name (queue->srcresult), queue->srcresult));
2500             }
2501             goto out_flow_error;
2502           }
2503         }
2504         /* refuse more events on EOS */
2505         if (queue->is_eos)
2506           goto out_eos;
2507         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2508         GST_QUEUE2_MUTEX_UNLOCK (queue);
2509         gst_queue2_post_buffering (queue);
2510       } else {
2511         /* non-serialized events are passed upstream. */
2512         ret = gst_pad_push_event (queue->srcpad, event);
2513       }
2514       break;
2515   }
2516   return ret;
2517
2518   /* ERRORS */
2519 out_flushing:
2520   {
2521     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2522     GST_QUEUE2_MUTEX_UNLOCK (queue);
2523     gst_event_unref (event);
2524     return FALSE;
2525   }
2526 out_eos:
2527   {
2528     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2529     GST_QUEUE2_MUTEX_UNLOCK (queue);
2530     gst_event_unref (event);
2531     return FALSE;
2532   }
2533 out_flow_error:
2534   {
2535     GST_LOG_OBJECT (queue,
2536         "refusing event, we have a downstream flow error: %s",
2537         gst_flow_get_name (queue->srcresult));
2538     GST_QUEUE2_MUTEX_UNLOCK (queue);
2539     gst_event_unref (event);
2540     return FALSE;
2541   }
2542 }
2543
2544 static gboolean
2545 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2546     GstQuery * query)
2547 {
2548   GstQueue2 *queue;
2549   gboolean res;
2550
2551   queue = GST_QUEUE2 (parent);
2552
2553   switch (GST_QUERY_TYPE (query)) {
2554     default:
2555       if (GST_QUERY_IS_SERIALIZED (query)) {
2556         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2557         /* serialized events go in the queue. We need to be certain that we
2558          * don't cause deadlocks waiting for the query return value. We check if
2559          * the queue is empty (nothing is blocking downstream and the query can
2560          * be pushed for sure) or we are not buffering. If we are buffering,
2561          * the pipeline waits to unblock downstream until our queue fills up
2562          * completely, which can not happen if we block on the query..
2563          * Therefore we only potentially block when we are not buffering. */
2564         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2565         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2566                 || !queue->use_buffering)) {
2567           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2568             gst_queue2_locked_enqueue (queue, query,
2569                 GST_QUEUE2_ITEM_TYPE_QUERY);
2570
2571             STATUS (queue, queue->sinkpad, "wait for QUERY");
2572             g_cond_wait (&queue->query_handled, &queue->qlock);
2573             if (queue->sinkresult != GST_FLOW_OK)
2574               goto out_flushing;
2575             res = queue->last_query;
2576           } else {
2577             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2578             res = FALSE;
2579           }
2580         } else {
2581           GST_DEBUG_OBJECT (queue,
2582               "refusing query, we are not using the queue");
2583           res = FALSE;
2584         }
2585         GST_QUEUE2_MUTEX_UNLOCK (queue);
2586         gst_queue2_post_buffering (queue);
2587       } else {
2588         res = gst_pad_query_default (pad, parent, query);
2589       }
2590       break;
2591   }
2592   return res;
2593
2594   /* ERRORS */
2595 out_flushing:
2596   {
2597     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2598     GST_QUEUE2_MUTEX_UNLOCK (queue);
2599     return FALSE;
2600   }
2601 }
2602
2603 static gboolean
2604 gst_queue2_is_empty (GstQueue2 * queue)
2605 {
2606   /* never empty on EOS */
2607   if (queue->is_eos)
2608     return FALSE;
2609
2610   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2611     return queue->current->writing_pos <= queue->current->max_reading_pos;
2612   } else {
2613     if (queue->queue.length == 0)
2614       return TRUE;
2615   }
2616
2617   return FALSE;
2618 }
2619
2620 static gboolean
2621 gst_queue2_is_filled (GstQueue2 * queue)
2622 {
2623   gboolean res;
2624
2625   /* always filled on EOS */
2626   if (queue->is_eos)
2627     return TRUE;
2628
2629 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2630     (queue->cur_level.format) >= ((alt_max) ? \
2631       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2632
2633   /* if using a ring buffer we're filled if all ring buffer space is used
2634    * _by the current range_ */
2635   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2636     guint64 rb_size = queue->ring_buffer_max_size;
2637     GST_DEBUG_OBJECT (queue,
2638         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2639         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2640     return CHECK_FILLED (bytes, rb_size);
2641   }
2642
2643   /* if using file, we're never filled if we don't have EOS */
2644   if (QUEUE_IS_USING_TEMP_FILE (queue))
2645     return FALSE;
2646
2647   /* we are never filled when we have no buffers at all */
2648   if (queue->cur_level.buffers == 0)
2649     return FALSE;
2650
2651   /* we are filled if one of the current levels exceeds the max */
2652   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2653       || CHECK_FILLED (time, 0);
2654
2655   /* if we need to, use the rate estimate to check against the max time we are
2656    * allowed to queue */
2657   if (queue->use_rate_estimate)
2658     res |= CHECK_FILLED (rate_time, 0);
2659
2660 #undef CHECK_FILLED
2661   return res;
2662 }
2663
2664 static GstFlowReturn
2665 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2666     GstMiniObject * item, GstQueue2ItemType item_type)
2667 {
2668   /* we have to lock the queue since we span threads */
2669   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2670   /* when we received EOS, we refuse more data */
2671   if (queue->is_eos)
2672     goto out_eos;
2673   /* when we received unexpected from downstream, refuse more buffers */
2674   if (queue->unexpected)
2675     goto out_unexpected;
2676
2677   /* while we didn't receive the newsegment, we're seeking and we skip data */
2678   if (queue->seeking)
2679     goto out_seeking;
2680
2681   if (!gst_queue2_wait_free_space (queue))
2682     goto out_flushing;
2683
2684   /* put buffer in queue now */
2685   gst_queue2_locked_enqueue (queue, item, item_type);
2686   GST_QUEUE2_MUTEX_UNLOCK (queue);
2687   gst_queue2_post_buffering (queue);
2688
2689   return GST_FLOW_OK;
2690
2691   /* special conditions */
2692 out_flushing:
2693   {
2694     GstFlowReturn ret = queue->sinkresult;
2695
2696     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2697         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2698     GST_QUEUE2_MUTEX_UNLOCK (queue);
2699     gst_mini_object_unref (item);
2700
2701     return ret;
2702   }
2703 out_eos:
2704   {
2705     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2706     GST_QUEUE2_MUTEX_UNLOCK (queue);
2707     gst_mini_object_unref (item);
2708
2709     return GST_FLOW_EOS;
2710   }
2711 out_seeking:
2712   {
2713     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2714     GST_QUEUE2_MUTEX_UNLOCK (queue);
2715     gst_mini_object_unref (item);
2716
2717     return GST_FLOW_OK;
2718   }
2719 out_unexpected:
2720   {
2721     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2722     GST_QUEUE2_MUTEX_UNLOCK (queue);
2723     gst_mini_object_unref (item);
2724
2725     return GST_FLOW_EOS;
2726   }
2727 }
2728
2729 static GstFlowReturn
2730 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2731 {
2732   GstQueue2 *queue;
2733
2734   queue = GST_QUEUE2 (parent);
2735
2736   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2737       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2738       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2739       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2740       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2741
2742   return gst_queue2_chain_buffer_or_buffer_list (queue,
2743       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2744 }
2745
2746 static GstFlowReturn
2747 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2748     GstBufferList * buffer_list)
2749 {
2750   GstQueue2 *queue;
2751
2752   queue = GST_QUEUE2 (parent);
2753
2754   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2755       "received buffer list %p", buffer_list);
2756
2757   return gst_queue2_chain_buffer_or_buffer_list (queue,
2758       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2759 }
2760
2761 static GstMiniObject *
2762 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2763 {
2764   GstMiniObject *data;
2765
2766   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2767
2768   /* stop pushing buffers, we dequeue all items until we see an item that we
2769    * can push again, which is EOS or SEGMENT. If there is nothing in the
2770    * queue we can push, we set a flag to make the sinkpad refuse more
2771    * buffers with an EOS return value until we receive something
2772    * pushable again or we get flushed. */
2773   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2774     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2775       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2776           "dropping EOS buffer %p", data);
2777       gst_buffer_unref (GST_BUFFER_CAST (data));
2778     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2779       GstEvent *event = GST_EVENT_CAST (data);
2780       GstEventType type = GST_EVENT_TYPE (event);
2781
2782       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2783         /* we found a pushable item in the queue, push it out */
2784         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2785             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2786         return data;
2787       }
2788       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2789           "dropping EOS event %p", event);
2790       gst_event_unref (event);
2791     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2792       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2793           "dropping EOS buffer list %p", data);
2794       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2795     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2796       queue->last_query = FALSE;
2797       g_cond_signal (&queue->query_handled);
2798       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2799     }
2800   }
2801   /* no more items in the queue. Set the unexpected flag so that upstream
2802    * make us refuse any more buffers on the sinkpad. Since we will still
2803    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2804    * task function does not shut down. */
2805   queue->unexpected = TRUE;
2806   return NULL;
2807 }
2808
2809 /* dequeue an item from the queue an push it downstream. This functions returns
2810  * the result of the push. */
2811 static GstFlowReturn
2812 gst_queue2_push_one (GstQueue2 * queue)
2813 {
2814   GstFlowReturn result = queue->srcresult;
2815   GstMiniObject *data;
2816   GstQueue2ItemType item_type;
2817
2818   data = gst_queue2_locked_dequeue (queue, &item_type);
2819   if (data == NULL)
2820     goto no_item;
2821
2822 next:
2823   STATUS (queue, queue->srcpad, "We have something dequeud");
2824   g_atomic_int_set (&queue->downstream_may_block,
2825       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2826       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2827   GST_QUEUE2_MUTEX_UNLOCK (queue);
2828   gst_queue2_post_buffering (queue);
2829
2830   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2831     GstBuffer *buffer;
2832
2833     buffer = GST_BUFFER_CAST (data);
2834
2835     result = gst_pad_push (queue->srcpad, buffer);
2836     g_atomic_int_set (&queue->downstream_may_block, 0);
2837
2838     /* need to check for srcresult here as well */
2839     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2840     if (result == GST_FLOW_EOS) {
2841       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2842       if (data != NULL)
2843         goto next;
2844       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2845        * to the caller so that the task function does not shut down */
2846       result = GST_FLOW_OK;
2847     }
2848   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2849     GstEvent *event = GST_EVENT_CAST (data);
2850     GstEventType type = GST_EVENT_TYPE (event);
2851
2852     if (type == GST_EVENT_TAG) {
2853       if (queue->use_tags_bitrate) {
2854         GstTagList *tags;
2855         guint bitrate;
2856
2857         gst_event_parse_tag (event, &tags);
2858         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2859             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2860           GST_QUEUE2_MUTEX_LOCK (queue);
2861           queue->src_tags_bitrate = bitrate;
2862           GST_QUEUE2_MUTEX_UNLOCK (queue);
2863           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
2864         }
2865       }
2866     }
2867
2868     gst_pad_push_event (queue->srcpad, event);
2869
2870     /* if we're EOS, return EOS so that the task pauses. */
2871     if (type == GST_EVENT_EOS) {
2872       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2873           "pushed EOS event %p, return EOS", event);
2874       result = GST_FLOW_EOS;
2875     }
2876
2877     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2878   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2879     GstBufferList *buffer_list;
2880
2881     buffer_list = GST_BUFFER_LIST_CAST (data);
2882
2883     result = gst_pad_push_list (queue->srcpad, buffer_list);
2884     g_atomic_int_set (&queue->downstream_may_block, 0);
2885
2886     /* need to check for srcresult here as well */
2887     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2888     if (result == GST_FLOW_EOS) {
2889       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2890       if (data != NULL)
2891         goto next;
2892       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2893        * to the caller so that the task function does not shut down */
2894       result = GST_FLOW_OK;
2895     }
2896   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2897     GstQuery *query = GST_QUERY_CAST (data);
2898
2899     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2900     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2901     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2902     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2903         "did query %p, return %d", query, queue->last_query);
2904     g_cond_signal (&queue->query_handled);
2905     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2906     result = GST_FLOW_OK;
2907   }
2908   return result;
2909
2910   /* ERRORS */
2911 no_item:
2912   {
2913     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2914         "exit because we have no item in the queue");
2915     return GST_FLOW_ERROR;
2916   }
2917 out_flushing:
2918   {
2919     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2920     return GST_FLOW_FLUSHING;
2921   }
2922 }
2923
2924 /* called repeatedly with @pad as the source pad. This function should push out
2925  * data to the peer element. */
2926 static void
2927 gst_queue2_loop (GstPad * pad)
2928 {
2929   GstQueue2 *queue;
2930   GstFlowReturn ret;
2931
2932   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2933
2934   /* have to lock for thread-safety */
2935   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2936
2937   if (gst_queue2_is_empty (queue)) {
2938     gboolean started;
2939
2940     /* pause the timer while we wait. The fact that we are waiting does not mean
2941      * the byterate on the output pad is lower */
2942     if ((started = queue->out_timer_started))
2943       g_timer_stop (queue->out_timer);
2944
2945     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2946         "queue is empty, waiting for new data");
2947     do {
2948       /* Wait for data to be available, we could be unlocked because of a flush. */
2949       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2950     }
2951     while (gst_queue2_is_empty (queue));
2952
2953     /* and continue if we were running before */
2954     if (started)
2955       g_timer_continue (queue->out_timer);
2956   }
2957   ret = gst_queue2_push_one (queue);
2958   queue->srcresult = ret;
2959   queue->sinkresult = ret;
2960   if (ret != GST_FLOW_OK)
2961     goto out_flushing;
2962
2963   GST_QUEUE2_MUTEX_UNLOCK (queue);
2964   gst_queue2_post_buffering (queue);
2965
2966   return;
2967
2968   /* ERRORS */
2969 out_flushing:
2970   {
2971     gboolean eos = queue->is_eos;
2972     GstFlowReturn ret = queue->srcresult;
2973
2974     gst_pad_pause_task (queue->srcpad);
2975     if (ret == GST_FLOW_FLUSHING) {
2976       gst_queue2_locked_flush (queue, FALSE, FALSE);
2977     } else {
2978       GST_QUEUE2_SIGNAL_DEL (queue);
2979       queue->last_query = FALSE;
2980       g_cond_signal (&queue->query_handled);
2981     }
2982     GST_QUEUE2_MUTEX_UNLOCK (queue);
2983     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2984         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2985     /* let app know about us giving up if upstream is not expected to do so */
2986     /* EOS is already taken care of elsewhere */
2987     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
2988       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2989           (_("Internal data flow error.")),
2990           ("streaming task paused, reason %s (%d)",
2991               gst_flow_get_name (ret), ret));
2992       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2993     }
2994     return;
2995   }
2996 }
2997
2998 static gboolean
2999 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3000 {
3001   gboolean res = TRUE;
3002   GstQueue2 *queue = GST_QUEUE2 (parent);
3003
3004 #ifndef GST_DISABLE_GST_DEBUG
3005   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3006       event, GST_EVENT_TYPE_NAME (event));
3007 #endif
3008
3009   switch (GST_EVENT_TYPE (event)) {
3010     case GST_EVENT_FLUSH_START:
3011       if (QUEUE_IS_USING_QUEUE (queue)) {
3012         /* just forward upstream */
3013         res = gst_pad_push_event (queue->sinkpad, event);
3014       } else {
3015         /* now unblock the getrange function */
3016         GST_QUEUE2_MUTEX_LOCK (queue);
3017         GST_DEBUG_OBJECT (queue, "flushing");
3018         queue->srcresult = GST_FLOW_FLUSHING;
3019         GST_QUEUE2_SIGNAL_ADD (queue);
3020         GST_QUEUE2_MUTEX_UNLOCK (queue);
3021
3022         /* when using a temp file, we eat the event */
3023         res = TRUE;
3024         gst_event_unref (event);
3025       }
3026       break;
3027     case GST_EVENT_FLUSH_STOP:
3028       if (QUEUE_IS_USING_QUEUE (queue)) {
3029         /* just forward upstream */
3030         res = gst_pad_push_event (queue->sinkpad, event);
3031       } else {
3032         /* now unblock the getrange function */
3033         GST_QUEUE2_MUTEX_LOCK (queue);
3034         queue->srcresult = GST_FLOW_OK;
3035         GST_QUEUE2_MUTEX_UNLOCK (queue);
3036
3037         /* when using a temp file, we eat the event */
3038         res = TRUE;
3039         gst_event_unref (event);
3040       }
3041       break;
3042     case GST_EVENT_RECONFIGURE:
3043       GST_QUEUE2_MUTEX_LOCK (queue);
3044       /* assume downstream is linked now and try to push again */
3045       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3046         queue->srcresult = GST_FLOW_OK;
3047         queue->sinkresult = GST_FLOW_OK;
3048         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3049           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3050               NULL);
3051         }
3052       }
3053       GST_QUEUE2_MUTEX_UNLOCK (queue);
3054
3055       res = gst_pad_push_event (queue->sinkpad, event);
3056       break;
3057     default:
3058       res = gst_pad_push_event (queue->sinkpad, event);
3059       break;
3060   }
3061
3062   return res;
3063 }
3064
3065 static gboolean
3066 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3067 {
3068   GstQueue2 *queue;
3069
3070   queue = GST_QUEUE2 (parent);
3071
3072   switch (GST_QUERY_TYPE (query)) {
3073     case GST_QUERY_POSITION:
3074     {
3075       gint64 peer_pos;
3076       GstFormat format;
3077
3078       if (!gst_pad_peer_query (queue->sinkpad, query))
3079         goto peer_failed;
3080
3081       /* get peer position */
3082       gst_query_parse_position (query, &format, &peer_pos);
3083
3084       /* FIXME: this code assumes that there's no discont in the queue */
3085       switch (format) {
3086         case GST_FORMAT_BYTES:
3087           peer_pos -= queue->cur_level.bytes;
3088           break;
3089         case GST_FORMAT_TIME:
3090           peer_pos -= queue->cur_level.time;
3091           break;
3092         default:
3093           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3094               "know how to adjust value", gst_format_get_name (format));
3095           return FALSE;
3096       }
3097       /* set updated position */
3098       gst_query_set_position (query, format, peer_pos);
3099       break;
3100     }
3101     case GST_QUERY_DURATION:
3102     {
3103       GST_DEBUG_OBJECT (queue, "doing peer query");
3104
3105       if (!gst_pad_peer_query (queue->sinkpad, query))
3106         goto peer_failed;
3107
3108       GST_DEBUG_OBJECT (queue, "peer query success");
3109       break;
3110     }
3111     case GST_QUERY_BUFFERING:
3112     {
3113       gint percent;
3114       gboolean is_buffering;
3115       GstBufferingMode mode;
3116       gint avg_in, avg_out;
3117       gint64 buffering_left;
3118
3119       GST_DEBUG_OBJECT (queue, "query buffering");
3120
3121       get_buffering_percent (queue, &is_buffering, &percent);
3122       gst_query_set_buffering_percent (query, is_buffering, percent);
3123
3124       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3125           &buffering_left);
3126       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3127           buffering_left);
3128
3129       if (!QUEUE_IS_USING_QUEUE (queue)) {
3130         /* add ranges for download and ringbuffer buffering */
3131         GstFormat format;
3132         gint64 start, stop, range_start, range_stop;
3133         guint64 writing_pos;
3134         gint64 estimated_total;
3135         gint64 duration;
3136         gboolean peer_res, is_eos;
3137         GstQueue2Range *queued_ranges;
3138
3139         /* we need a current download region */
3140         if (queue->current == NULL)
3141           return FALSE;
3142
3143         writing_pos = queue->current->writing_pos;
3144         is_eos = queue->is_eos;
3145
3146         if (is_eos) {
3147           /* we're EOS, we know the duration in bytes now */
3148           peer_res = TRUE;
3149           duration = writing_pos;
3150         } else {
3151           /* get duration of upstream in bytes */
3152           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3153               GST_FORMAT_BYTES, &duration);
3154         }
3155
3156         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3157             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3158
3159         /* calculate remaining and total download time */
3160         if (peer_res && avg_in > 0.0)
3161           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3162         else
3163           estimated_total = -1;
3164
3165         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3166             estimated_total);
3167
3168         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3169
3170         switch (format) {
3171           case GST_FORMAT_PERCENT:
3172             /* we need duration */
3173             if (!peer_res)
3174               goto peer_failed;
3175
3176             start = 0;
3177             /* get our available data relative to the duration */
3178             if (duration != -1)
3179               stop =
3180                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3181                   duration);
3182             else
3183               stop = -1;
3184             break;
3185           case GST_FORMAT_BYTES:
3186             start = 0;
3187             stop = writing_pos;
3188             break;
3189           default:
3190             start = -1;
3191             stop = -1;
3192             break;
3193         }
3194
3195         /* fill out the buffered ranges */
3196         for (queued_ranges = queue->ranges; queued_ranges;
3197             queued_ranges = queued_ranges->next) {
3198           switch (format) {
3199             case GST_FORMAT_PERCENT:
3200               if (duration == -1) {
3201                 range_start = 0;
3202                 range_stop = 0;
3203                 break;
3204               }
3205               range_start =
3206                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3207                   queued_ranges->offset, duration);
3208               range_stop =
3209                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3210                   queued_ranges->writing_pos, duration);
3211               break;
3212             case GST_FORMAT_BYTES:
3213               range_start = queued_ranges->offset;
3214               range_stop = queued_ranges->writing_pos;
3215               break;
3216             default:
3217               range_start = -1;
3218               range_stop = -1;
3219               break;
3220           }
3221           if (range_start == range_stop)
3222             continue;
3223           GST_DEBUG_OBJECT (queue,
3224               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3225               G_GINT64_FORMAT, range_start, range_stop);
3226           gst_query_add_buffering_range (query, range_start, range_stop);
3227         }
3228
3229         gst_query_set_buffering_range (query, format, start, stop,
3230             estimated_total);
3231       }
3232       break;
3233     }
3234     case GST_QUERY_SCHEDULING:
3235     {
3236       gboolean pull_mode;
3237       GstSchedulingFlags flags = 0;
3238
3239       if (!gst_pad_peer_query (queue->sinkpad, query))
3240         goto peer_failed;
3241
3242       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3243
3244       /* we can operate in pull mode when we are using a tempfile */
3245       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3246
3247       if (pull_mode)
3248         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3249       gst_query_set_scheduling (query, flags, 0, -1, 0);
3250       if (pull_mode)
3251         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3252       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3253       break;
3254     }
3255     default:
3256       /* peer handled other queries */
3257       if (!gst_pad_query_default (pad, parent, query))
3258         goto peer_failed;
3259       break;
3260   }
3261
3262   return TRUE;
3263
3264   /* ERRORS */
3265 peer_failed:
3266   {
3267     GST_DEBUG_OBJECT (queue, "failed peer query");
3268     return FALSE;
3269   }
3270 }
3271
3272 static gboolean
3273 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3274 {
3275   GstQueue2 *queue = GST_QUEUE2 (element);
3276
3277   /* simply forward to the srcpad query function */
3278   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3279       query);
3280 }
3281
3282 static void
3283 gst_queue2_update_upstream_size (GstQueue2 * queue)
3284 {
3285   gint64 upstream_size = -1;
3286
3287   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3288           &upstream_size)) {
3289     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3290
3291     /* upstream_size can be negative but queue->upstream_size is unsigned.
3292      * Prevent setting negative values to it (the query can return -1) */
3293     if (upstream_size >= 0)
3294       queue->upstream_size = upstream_size;
3295     else
3296       queue->upstream_size = 0;
3297   }
3298 }
3299
3300 static GstFlowReturn
3301 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3302     guint length, GstBuffer ** buffer)
3303 {
3304   GstQueue2 *queue;
3305   GstFlowReturn ret;
3306
3307   queue = GST_QUEUE2_CAST (parent);
3308
3309   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3310   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3311   offset = (offset == -1) ? queue->current->reading_pos : offset;
3312
3313   GST_DEBUG_OBJECT (queue,
3314       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3315
3316   /* catch any reads beyond the size of the file here to make sure queue2
3317    * doesn't send seek events beyond the size of the file upstream, since
3318    * that would confuse elements such as souphttpsrc and/or http servers.
3319    * Demuxers often just loop until EOS at the end of the file to figure out
3320    * when they've read all the end-headers or index chunks. */
3321   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3322     gst_queue2_update_upstream_size (queue);
3323     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3324       goto out_unexpected;
3325   }
3326
3327   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3328     gst_queue2_update_upstream_size (queue);
3329     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3330       length = queue->upstream_size - offset;
3331       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3332     }
3333   }
3334
3335   /* FIXME - function will block when the range is not yet available */
3336   ret = gst_queue2_create_read (queue, offset, length, buffer);
3337   GST_QUEUE2_MUTEX_UNLOCK (queue);
3338   gst_queue2_post_buffering (queue);
3339
3340   return ret;
3341
3342   /* ERRORS */
3343 out_flushing:
3344   {
3345     ret = queue->srcresult;
3346
3347     GST_DEBUG_OBJECT (queue, "we are flushing");
3348     GST_QUEUE2_MUTEX_UNLOCK (queue);
3349     return ret;
3350   }
3351 out_unexpected:
3352   {
3353     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3354     GST_QUEUE2_MUTEX_UNLOCK (queue);
3355     return GST_FLOW_EOS;
3356   }
3357 }
3358
3359 /* sink currently only operates in push mode */
3360 static gboolean
3361 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3362     GstPadMode mode, gboolean active)
3363 {
3364   gboolean result;
3365   GstQueue2 *queue;
3366
3367   queue = GST_QUEUE2 (parent);
3368
3369   switch (mode) {
3370     case GST_PAD_MODE_PUSH:
3371       if (active) {
3372         GST_QUEUE2_MUTEX_LOCK (queue);
3373         GST_DEBUG_OBJECT (queue, "activating push mode");
3374         queue->srcresult = GST_FLOW_OK;
3375         queue->sinkresult = GST_FLOW_OK;
3376         queue->is_eos = FALSE;
3377         queue->unexpected = FALSE;
3378         reset_rate_timer (queue);
3379         GST_QUEUE2_MUTEX_UNLOCK (queue);
3380       } else {
3381         /* unblock chain function */
3382         GST_QUEUE2_MUTEX_LOCK (queue);
3383         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3384         queue->srcresult = GST_FLOW_FLUSHING;
3385         queue->sinkresult = GST_FLOW_FLUSHING;
3386         GST_QUEUE2_SIGNAL_DEL (queue);
3387         /* Unblock query handler */
3388         queue->last_query = FALSE;
3389         g_cond_signal (&queue->query_handled);
3390         GST_QUEUE2_MUTEX_UNLOCK (queue);
3391
3392         /* wait until it is unblocked and clean up */
3393         GST_PAD_STREAM_LOCK (pad);
3394         GST_QUEUE2_MUTEX_LOCK (queue);
3395         gst_queue2_locked_flush (queue, TRUE, FALSE);
3396         GST_QUEUE2_MUTEX_UNLOCK (queue);
3397         GST_PAD_STREAM_UNLOCK (pad);
3398       }
3399       result = TRUE;
3400       break;
3401     default:
3402       result = FALSE;
3403       break;
3404   }
3405   return result;
3406 }
3407
3408 /* src operating in push mode, we start a task on the source pad that pushes out
3409  * buffers from the queue */
3410 static gboolean
3411 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3412 {
3413   gboolean result = FALSE;
3414   GstQueue2 *queue;
3415
3416   queue = GST_QUEUE2 (parent);
3417
3418   if (active) {
3419     GST_QUEUE2_MUTEX_LOCK (queue);
3420     GST_DEBUG_OBJECT (queue, "activating push mode");
3421     queue->srcresult = GST_FLOW_OK;
3422     queue->sinkresult = GST_FLOW_OK;
3423     queue->is_eos = FALSE;
3424     queue->unexpected = FALSE;
3425     result =
3426         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3427     GST_QUEUE2_MUTEX_UNLOCK (queue);
3428   } else {
3429     /* unblock loop function */
3430     GST_QUEUE2_MUTEX_LOCK (queue);
3431     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3432     queue->srcresult = GST_FLOW_FLUSHING;
3433     queue->sinkresult = GST_FLOW_FLUSHING;
3434     /* the item add signal will unblock */
3435     GST_QUEUE2_SIGNAL_ADD (queue);
3436     GST_QUEUE2_MUTEX_UNLOCK (queue);
3437
3438     /* step 2, make sure streaming finishes */
3439     result = gst_pad_stop_task (pad);
3440   }
3441
3442   return result;
3443 }
3444
3445 /* pull mode, downstream will call our getrange function */
3446 static gboolean
3447 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3448 {
3449   gboolean result;
3450   GstQueue2 *queue;
3451
3452   queue = GST_QUEUE2 (parent);
3453
3454   if (active) {
3455     GST_QUEUE2_MUTEX_LOCK (queue);
3456     if (!QUEUE_IS_USING_QUEUE (queue)) {
3457       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3458         /* open the temp file now */
3459         result = gst_queue2_open_temp_location_file (queue);
3460       } else if (!queue->ring_buffer) {
3461         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3462         result = ! !queue->ring_buffer;
3463       } else {
3464         result = TRUE;
3465       }
3466
3467       GST_DEBUG_OBJECT (queue, "activating pull mode");
3468       init_ranges (queue);
3469       queue->srcresult = GST_FLOW_OK;
3470       queue->sinkresult = GST_FLOW_OK;
3471       queue->is_eos = FALSE;
3472       queue->unexpected = FALSE;
3473       queue->upstream_size = 0;
3474     } else {
3475       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3476       /* this is not allowed, we cannot operate in pull mode without a temp
3477        * file. */
3478       queue->srcresult = GST_FLOW_FLUSHING;
3479       queue->sinkresult = GST_FLOW_FLUSHING;
3480       result = FALSE;
3481     }
3482     GST_QUEUE2_MUTEX_UNLOCK (queue);
3483   } else {
3484     GST_QUEUE2_MUTEX_LOCK (queue);
3485     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3486     queue->srcresult = GST_FLOW_FLUSHING;
3487     queue->sinkresult = GST_FLOW_FLUSHING;
3488     /* this will unlock getrange */
3489     GST_QUEUE2_SIGNAL_ADD (queue);
3490     result = TRUE;
3491     GST_QUEUE2_MUTEX_UNLOCK (queue);
3492   }
3493
3494   return result;
3495 }
3496
3497 static gboolean
3498 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3499     gboolean active)
3500 {
3501   gboolean res;
3502
3503   switch (mode) {
3504     case GST_PAD_MODE_PULL:
3505       res = gst_queue2_src_activate_pull (pad, parent, active);
3506       break;
3507     case GST_PAD_MODE_PUSH:
3508       res = gst_queue2_src_activate_push (pad, parent, active);
3509       break;
3510     default:
3511       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3512       res = FALSE;
3513       break;
3514   }
3515   return res;
3516 }
3517
3518 static GstStateChangeReturn
3519 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3520 {
3521   GstQueue2 *queue;
3522   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3523
3524   queue = GST_QUEUE2 (element);
3525
3526   switch (transition) {
3527     case GST_STATE_CHANGE_NULL_TO_READY:
3528       break;
3529     case GST_STATE_CHANGE_READY_TO_PAUSED:
3530       GST_QUEUE2_MUTEX_LOCK (queue);
3531       if (!QUEUE_IS_USING_QUEUE (queue)) {
3532         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3533           if (!gst_queue2_open_temp_location_file (queue))
3534             ret = GST_STATE_CHANGE_FAILURE;
3535         } else {
3536           if (queue->ring_buffer) {
3537             g_free (queue->ring_buffer);
3538             queue->ring_buffer = NULL;
3539           }
3540           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3541             ret = GST_STATE_CHANGE_FAILURE;
3542         }
3543         init_ranges (queue);
3544       }
3545       queue->segment_event_received = FALSE;
3546       queue->starting_segment = NULL;
3547       gst_event_replace (&queue->stream_start_event, NULL);
3548       GST_QUEUE2_MUTEX_UNLOCK (queue);
3549       break;
3550     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3551       break;
3552     default:
3553       break;
3554   }
3555
3556   if (ret == GST_STATE_CHANGE_FAILURE)
3557     return ret;
3558
3559   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3560
3561   if (ret == GST_STATE_CHANGE_FAILURE)
3562     return ret;
3563
3564   switch (transition) {
3565     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3566       break;
3567     case GST_STATE_CHANGE_PAUSED_TO_READY:
3568       GST_QUEUE2_MUTEX_LOCK (queue);
3569       if (!QUEUE_IS_USING_QUEUE (queue)) {
3570         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3571           gst_queue2_close_temp_location_file (queue);
3572         } else if (queue->ring_buffer) {
3573           g_free (queue->ring_buffer);
3574           queue->ring_buffer = NULL;
3575         }
3576         clean_ranges (queue);
3577       }
3578       if (queue->starting_segment != NULL) {
3579         gst_event_unref (queue->starting_segment);
3580         queue->starting_segment = NULL;
3581       }
3582       gst_event_replace (&queue->stream_start_event, NULL);
3583       GST_QUEUE2_MUTEX_UNLOCK (queue);
3584       break;
3585     case GST_STATE_CHANGE_READY_TO_NULL:
3586       break;
3587     default:
3588       break;
3589   }
3590
3591   return ret;
3592 }
3593
3594 /* changing the capacity of the queue must wake up
3595  * the _chain function, it might have more room now
3596  * to store the buffer/event in the queue */
3597 #define QUEUE_CAPACITY_CHANGE(q) \
3598   GST_QUEUE2_SIGNAL_DEL (queue); \
3599   if (queue->use_buffering)      \
3600     update_buffering (queue);
3601
3602 /* Changing the minimum required fill level must
3603  * wake up the _loop function as it might now
3604  * be able to preceed.
3605  */
3606 #define QUEUE_THRESHOLD_CHANGE(q)\
3607   GST_QUEUE2_SIGNAL_ADD (queue);
3608
3609 static void
3610 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3611 {
3612   GstState state;
3613
3614   /* the element must be stopped in order to do this */
3615   GST_OBJECT_LOCK (queue);
3616   state = GST_STATE (queue);
3617   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3618     goto wrong_state;
3619   GST_OBJECT_UNLOCK (queue);
3620
3621   /* set new location */
3622   g_free (queue->temp_template);
3623   queue->temp_template = g_strdup (template);
3624
3625   return;
3626
3627 /* ERROR */
3628 wrong_state:
3629   {
3630     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3631     GST_OBJECT_UNLOCK (queue);
3632   }
3633 }
3634
3635 static void
3636 gst_queue2_set_property (GObject * object,
3637     guint prop_id, const GValue * value, GParamSpec * pspec)
3638 {
3639   GstQueue2 *queue = GST_QUEUE2 (object);
3640
3641   /* someone could change levels here, and since this
3642    * affects the get/put funcs, we need to lock for safety. */
3643   GST_QUEUE2_MUTEX_LOCK (queue);
3644
3645   switch (prop_id) {
3646     case PROP_MAX_SIZE_BYTES:
3647       queue->max_level.bytes = g_value_get_uint (value);
3648       QUEUE_CAPACITY_CHANGE (queue);
3649       break;
3650     case PROP_MAX_SIZE_BUFFERS:
3651       queue->max_level.buffers = g_value_get_uint (value);
3652       QUEUE_CAPACITY_CHANGE (queue);
3653       break;
3654     case PROP_MAX_SIZE_TIME:
3655       queue->max_level.time = g_value_get_uint64 (value);
3656       /* set rate_time to the same value. We use an extra field in the level
3657        * structure so that we can easily access and compare it */
3658       queue->max_level.rate_time = queue->max_level.time;
3659       QUEUE_CAPACITY_CHANGE (queue);
3660       break;
3661     case PROP_USE_BUFFERING:
3662       queue->use_buffering = g_value_get_boolean (value);
3663       if (!queue->use_buffering && queue->is_buffering) {
3664         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3665             "posting 100%% message");
3666         SET_PERCENT (queue, 100);
3667         queue->is_buffering = FALSE;
3668       }
3669
3670       if (queue->use_buffering) {
3671         queue->is_buffering = TRUE;
3672         update_buffering (queue);
3673       }
3674       break;
3675     case PROP_USE_TAGS_BITRATE:
3676       queue->use_tags_bitrate = g_value_get_boolean (value);
3677       break;
3678     case PROP_USE_RATE_ESTIMATE:
3679       queue->use_rate_estimate = g_value_get_boolean (value);
3680       break;
3681     case PROP_LOW_PERCENT:
3682       queue->low_percent = g_value_get_int (value);
3683       break;
3684     case PROP_HIGH_PERCENT:
3685       queue->high_percent = g_value_get_int (value);
3686       break;
3687     case PROP_TEMP_TEMPLATE:
3688       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3689       break;
3690     case PROP_TEMP_REMOVE:
3691       queue->temp_remove = g_value_get_boolean (value);
3692       break;
3693     case PROP_RING_BUFFER_MAX_SIZE:
3694       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3695       break;
3696     default:
3697       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3698       break;
3699   }
3700
3701   GST_QUEUE2_MUTEX_UNLOCK (queue);
3702   gst_queue2_post_buffering (queue);
3703 }
3704
3705 static void
3706 gst_queue2_get_property (GObject * object,
3707     guint prop_id, GValue * value, GParamSpec * pspec)
3708 {
3709   GstQueue2 *queue = GST_QUEUE2 (object);
3710
3711   GST_QUEUE2_MUTEX_LOCK (queue);
3712
3713   switch (prop_id) {
3714     case PROP_CUR_LEVEL_BYTES:
3715       g_value_set_uint (value, queue->cur_level.bytes);
3716       break;
3717     case PROP_CUR_LEVEL_BUFFERS:
3718       g_value_set_uint (value, queue->cur_level.buffers);
3719       break;
3720     case PROP_CUR_LEVEL_TIME:
3721       g_value_set_uint64 (value, queue->cur_level.time);
3722       break;
3723     case PROP_MAX_SIZE_BYTES:
3724       g_value_set_uint (value, queue->max_level.bytes);
3725       break;
3726     case PROP_MAX_SIZE_BUFFERS:
3727       g_value_set_uint (value, queue->max_level.buffers);
3728       break;
3729     case PROP_MAX_SIZE_TIME:
3730       g_value_set_uint64 (value, queue->max_level.time);
3731       break;
3732     case PROP_USE_BUFFERING:
3733       g_value_set_boolean (value, queue->use_buffering);
3734       break;
3735     case PROP_USE_TAGS_BITRATE:
3736       g_value_set_boolean (value, queue->use_tags_bitrate);
3737       break;
3738     case PROP_USE_RATE_ESTIMATE:
3739       g_value_set_boolean (value, queue->use_rate_estimate);
3740       break;
3741     case PROP_LOW_PERCENT:
3742       g_value_set_int (value, queue->low_percent);
3743       break;
3744     case PROP_HIGH_PERCENT:
3745       g_value_set_int (value, queue->high_percent);
3746       break;
3747     case PROP_TEMP_TEMPLATE:
3748       g_value_set_string (value, queue->temp_template);
3749       break;
3750     case PROP_TEMP_LOCATION:
3751       g_value_set_string (value, queue->temp_location);
3752       break;
3753     case PROP_TEMP_REMOVE:
3754       g_value_set_boolean (value, queue->temp_remove);
3755       break;
3756     case PROP_RING_BUFFER_MAX_SIZE:
3757       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3758       break;
3759     case PROP_AVG_IN_RATE:
3760     {
3761       gdouble in_rate = queue->byte_in_rate;
3762
3763       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3764        * calculated, so calculate it here. */
3765       if (in_rate == 0.0 && queue->bytes_in
3766           && queue->last_update_in_rates_elapsed > 0.0)
3767         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3768
3769       g_value_set_int64 (value, (gint64) in_rate);
3770       break;
3771     }
3772     default:
3773       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3774       break;
3775   }
3776
3777   GST_QUEUE2_MUTEX_UNLOCK (queue);
3778 }