queue2: Handle buffering levels on NOT_LINKED
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  * @title: queue2
29  *
30  * Data is queued until one of the limits specified by the
31  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
32  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
33  * more buffers into the queue will block the pushing thread until more space
34  * becomes available.
35  *
36  * The queue will create a new thread on the source pad to decouple the
37  * processing on sink and source pad.
38  *
39  * You can query how many buffers are queued by reading the
40  * #GstQueue2:current-level-buffers property.
41  *
42  * The default queue size limits are 100 buffers, 2MB of data, or
43  * two seconds worth of data, whichever is reached first.
44  *
45  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
46  * will allocate a random free filename and buffer data in the file.
47  * By using this, it will buffer the entire stream data on the file independently
48  * of the queue size limits, they will only be used for buffering statistics.
49  *
50  * The temp-location property will be used to notify the application of the
51  * allocated filename.
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include "gstqueue2.h"
59
60 #include <glib/gstdio.h>
61
62 #include "gst/gst-i18n-lib.h"
63 #include "gst/glib-compat-private.h"
64
65 #include <string.h>
66
67 #ifdef G_OS_WIN32
68 #include <io.h>                 /* lseek, open, close, read */
69 #undef lseek
70 #define lseek _lseeki64
71 #undef off_t
72 #define off_t guint64
73 #else
74 #include <unistd.h>
75 #endif
76
77 #ifdef __BIONIC__               /* Android */
78 #undef lseek
79 #define lseek lseek64
80 #undef off_t
81 #define off_t guint64
82 #include <fcntl.h>
83 #endif
84
85 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
86     GST_PAD_SINK,
87     GST_PAD_ALWAYS,
88     GST_STATIC_CAPS_ANY);
89
90 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
91     GST_PAD_SRC,
92     GST_PAD_ALWAYS,
93     GST_STATIC_CAPS_ANY);
94
95 GST_DEBUG_CATEGORY_STATIC (queue_debug);
96 #define GST_CAT_DEFAULT (queue_debug)
97 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
98
99 enum
100 {
101   LAST_SIGNAL
102 };
103
104 /* other defines */
105 #define DEFAULT_BUFFER_SIZE 4096
106 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
107 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
108 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
109
110 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
111
112 /* default property values */
113 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
114 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
115 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
116 #define DEFAULT_USE_BUFFERING      FALSE
117 #define DEFAULT_USE_TAGS_BITRATE   FALSE
118 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
119 #define DEFAULT_LOW_PERCENT        10
120 #define DEFAULT_HIGH_PERCENT       99
121 #define DEFAULT_LOW_WATERMARK      0.01
122 #define DEFAULT_HIGH_WATERMARK     0.99
123 #define DEFAULT_TEMP_REMOVE        TRUE
124 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
125
126 enum
127 {
128   PROP_0,
129   PROP_CUR_LEVEL_BUFFERS,
130   PROP_CUR_LEVEL_BYTES,
131   PROP_CUR_LEVEL_TIME,
132   PROP_MAX_SIZE_BUFFERS,
133   PROP_MAX_SIZE_BYTES,
134   PROP_MAX_SIZE_TIME,
135   PROP_USE_BUFFERING,
136   PROP_USE_TAGS_BITRATE,
137   PROP_USE_RATE_ESTIMATE,
138   PROP_LOW_PERCENT,
139   PROP_HIGH_PERCENT,
140   PROP_LOW_WATERMARK,
141   PROP_HIGH_WATERMARK,
142   PROP_TEMP_TEMPLATE,
143   PROP_TEMP_LOCATION,
144   PROP_TEMP_REMOVE,
145   PROP_RING_BUFFER_MAX_SIZE,
146   PROP_AVG_IN_RATE,
147   PROP_LAST
148 };
149
150 /* Explanation for buffer levels and percentages:
151  *
152  * The buffering_level functions here return a value in a normalized range
153  * that specifies the queue's current fill level. The range goes from 0 to
154  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
155  *
156  * This is not to be confused with the buffering_percent value, which is
157  * a *relative* quantity - relative to the low/high watermarks.
158  * buffering_percent = 0% means buffering_level is at the low watermark.
159  * buffering_percent = 100% means buffering_level is at the high watermark.
160  * buffering_percent is used for determining if the fill level has reached
161  * the high watermark, and for producing BUFFERING messages. This value
162  * always uses a 0..100 range (since it is a percentage).
163  *
164  * To avoid future confusions, whenever "buffering level" is mentioned, it
165  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
166  * range. Whenever "buffering_percent" is mentioned, it refers to the
167  * percentage value that is relative to the low/high watermark. */
168
169 /* Using a buffering level range of 0..1000000 to allow for a
170  * resolution in ppm (1 ppm = 0.0001%) */
171 #define MAX_BUFFERING_LEVEL 1000000
172
173 /* How much 1% makes up in the buffer level range */
174 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
175
176 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
177   l.buffers = 0;                                        \
178   l.bytes = 0;                                          \
179   l.time = 0;                                           \
180   l.rate_time = 0;                                      \
181 } G_STMT_END
182
183 #define STATUS(queue, pad, msg) \
184   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
185                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
186                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
187                       " ns, %"G_GUINT64_FORMAT" items", \
188                       GST_DEBUG_PAD_NAME (pad), \
189                       queue->cur_level.buffers, \
190                       queue->max_level.buffers, \
191                       queue->cur_level.bytes, \
192                       queue->max_level.bytes, \
193                       queue->cur_level.time, \
194                       queue->max_level.time, \
195                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
196                         queue->current->writing_pos - queue->current->max_reading_pos : \
197                         queue->queue.length))
198
199 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
200   g_mutex_lock (&q->qlock);                                              \
201 } G_STMT_END
202
203 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
204   GST_QUEUE2_MUTEX_LOCK (q);                                            \
205   if (res != GST_FLOW_OK)                                               \
206     goto label;                                                         \
207 } G_STMT_END
208
209 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
210   g_mutex_unlock (&q->qlock);                                            \
211 } G_STMT_END
212
213 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
214   STATUS (queue, q->sinkpad, "wait for DEL");                           \
215   q->waiting_del = TRUE;                                                \
216   g_cond_wait (&q->item_del, &queue->qlock);                              \
217   q->waiting_del = FALSE;                                               \
218   if (res != GST_FLOW_OK) {                                             \
219     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
220     goto label;                                                         \
221   }                                                                     \
222   STATUS (queue, q->sinkpad, "received DEL");                           \
223 } G_STMT_END
224
225 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
226   STATUS (queue, q->srcpad, "wait for ADD");                            \
227   q->waiting_add = TRUE;                                                \
228   g_cond_wait (&q->item_add, &q->qlock);                                  \
229   q->waiting_add = FALSE;                                               \
230   if (res != GST_FLOW_OK) {                                             \
231     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
232     goto label;                                                         \
233   }                                                                     \
234   STATUS (queue, q->srcpad, "received ADD");                            \
235 } G_STMT_END
236
237 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
238   if (q->waiting_del) {                                                 \
239     STATUS (q, q->srcpad, "signal DEL");                                \
240     g_cond_signal (&q->item_del);                                        \
241   }                                                                     \
242 } G_STMT_END
243
244 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
245   if (q->waiting_add) {                                                 \
246     STATUS (q, q->sinkpad, "signal ADD");                               \
247     g_cond_signal (&q->item_add);                                        \
248   }                                                                     \
249 } G_STMT_END
250
251 #define SET_PERCENT(q, perc) G_STMT_START {                              \
252   if (perc != q->buffering_percent) {                                    \
253     q->buffering_percent = perc;                                         \
254     q->percent_changed = TRUE;                                           \
255     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
256     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
257         &q->buffering_left);                                             \
258   }                                                                      \
259 } G_STMT_END
260
261 #define _do_init \
262     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
263     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
264         "dataflow inside the queue element");
265 #define gst_queue2_parent_class parent_class
266 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
267
268 static void gst_queue2_finalize (GObject * object);
269
270 static void gst_queue2_set_property (GObject * object,
271     guint prop_id, const GValue * value, GParamSpec * pspec);
272 static void gst_queue2_get_property (GObject * object,
273     guint prop_id, GValue * value, GParamSpec * pspec);
274
275 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
276     GstBuffer * buffer);
277 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
278     GstBufferList * buffer_list);
279 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
280 static void gst_queue2_loop (GstPad * pad);
281
282 static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad,
283     GstObject * parent, GstEvent * event);
284 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
285     GstQuery * query);
286
287 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
288     GstEvent * event);
289 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
290     GstQuery * query);
291 static gboolean gst_queue2_handle_query (GstElement * element,
292     GstQuery * query);
293
294 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
295     guint64 offset, guint length, GstBuffer ** buffer);
296
297 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
298     GstPadMode mode, gboolean active);
299 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
300     GstPadMode mode, gboolean active);
301 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
302     GstStateChange transition);
303
304 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
305 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
306
307 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
308 static void update_in_rates (GstQueue2 * queue, gboolean force);
309 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
310 static void gst_queue2_post_buffering (GstQueue2 * queue);
311
312 typedef enum
313 {
314   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
315   GST_QUEUE2_ITEM_TYPE_BUFFER,
316   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
317   GST_QUEUE2_ITEM_TYPE_EVENT,
318   GST_QUEUE2_ITEM_TYPE_QUERY
319 } GstQueue2ItemType;
320
321 typedef struct
322 {
323   GstQueue2ItemType type;
324   GstMiniObject *item;
325 } GstQueue2Item;
326
327 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
328
329 static void
330 gst_queue2_class_init (GstQueue2Class * klass)
331 {
332   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
333   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
334
335   gobject_class->set_property = gst_queue2_set_property;
336   gobject_class->get_property = gst_queue2_get_property;
337
338   /* properties */
339   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
340       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
341           "Current amount of data in the queue (bytes)",
342           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
343   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
344       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
345           "Current number of buffers in the queue",
346           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
347   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
348       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
349           "Current amount of data in the queue (in ns)",
350           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
351
352   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
353       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
354           "Max. amount of data in the queue (bytes, 0=disable)",
355           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
356           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
357           G_PARAM_STATIC_STRINGS));
358   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
359       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
360           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
361           DEFAULT_MAX_SIZE_BUFFERS,
362           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
363           G_PARAM_STATIC_STRINGS));
364   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
365       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
366           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
367           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
368           G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
371       g_param_spec_boolean ("use-buffering", "Use buffering",
372           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
373           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
374           G_PARAM_STATIC_STRINGS));
375   g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
376       g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
377           "Use a bitrate from upstream tags to estimate buffer duration if not provided",
378           DEFAULT_USE_TAGS_BITRATE,
379           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
380           G_PARAM_STATIC_STRINGS));
381   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
382       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
383           "Estimate the bitrate of the stream to calculate time level",
384           DEFAULT_USE_RATE_ESTIMATE,
385           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
387       g_param_spec_int ("low-percent", "Low percent",
388           "Low threshold for buffering to start. Only used if use-buffering is True "
389           "(Deprecated: use low-watermark instead)",
390           0, 100, DEFAULT_LOW_WATERMARK * 100,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
393       g_param_spec_int ("high-percent", "High percent",
394           "High threshold for buffering to finish. Only used if use-buffering is True "
395           "(Deprecated: use high-watermark instead)",
396           0, 100, DEFAULT_HIGH_WATERMARK * 100,
397           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
399       g_param_spec_double ("low-watermark", "Low watermark",
400           "Low threshold for buffering to start. Only used if use-buffering is True",
401           0.0, 1.0, DEFAULT_LOW_WATERMARK,
402           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
403   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
404       g_param_spec_double ("high-watermark", "High watermark",
405           "High threshold for buffering to finish. Only used if use-buffering is True",
406           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
407           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
408
409   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
410       g_param_spec_string ("temp-template", "Temporary File Template",
411           "File template to store temporary files in, should contain directory "
412           "and XXXXXX. (NULL == disabled)",
413           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
414
415   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
416       g_param_spec_string ("temp-location", "Temporary File Location",
417           "Location to store temporary files in (Only read this property, "
418           "use temp-template to configure the name template)",
419           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
420
421   /**
422    * GstQueue2:temp-remove
423    *
424    * When temp-template is set, remove the temporary file when going to READY.
425    */
426   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
427       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
428           "Remove the temp-location after use",
429           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
430
431   /**
432    * GstQueue2:ring-buffer-max-size
433    *
434    * The maximum size of the ring buffer in bytes. If set to 0, the ring
435    * buffer is disabled. Default 0.
436    */
437   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
438       g_param_spec_uint64 ("ring-buffer-max-size",
439           "Max. ring buffer size (bytes)",
440           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
441           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
442           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
443
444   /**
445    * GstQueue2:avg-in-rate
446    *
447    * The average input data rate.
448    */
449   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
450       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
451           "Average input data rate (bytes/s)",
452           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
453
454   /* set several parent class virtual functions */
455   gobject_class->finalize = gst_queue2_finalize;
456
457   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
458   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
459
460   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
461       "Generic",
462       "Simple data queue",
463       "Erik Walthinsen <omega@cse.ogi.edu>, "
464       "Wim Taymans <wim.taymans@gmail.com>");
465
466   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
467   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
468 }
469
470 static void
471 gst_queue2_init (GstQueue2 * queue)
472 {
473   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
474
475   gst_pad_set_chain_function (queue->sinkpad,
476       GST_DEBUG_FUNCPTR (gst_queue2_chain));
477   gst_pad_set_chain_list_function (queue->sinkpad,
478       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
479   gst_pad_set_activatemode_function (queue->sinkpad,
480       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
481   gst_pad_set_event_full_function (queue->sinkpad,
482       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
483   gst_pad_set_query_function (queue->sinkpad,
484       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
485   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
486   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
487
488   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
489
490   gst_pad_set_activatemode_function (queue->srcpad,
491       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
492   gst_pad_set_getrange_function (queue->srcpad,
493       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
494   gst_pad_set_event_function (queue->srcpad,
495       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
496   gst_pad_set_query_function (queue->srcpad,
497       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
498   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
499   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
500
501   /* levels */
502   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
503   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
504   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
505   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
506   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
507   queue->use_buffering = DEFAULT_USE_BUFFERING;
508   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
509   queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
510   queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
511
512   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
513   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
514
515   queue->sinktime = GST_CLOCK_TIME_NONE;
516   queue->srctime = GST_CLOCK_TIME_NONE;
517   queue->sink_tainted = TRUE;
518   queue->src_tainted = TRUE;
519
520   queue->srcresult = GST_FLOW_FLUSHING;
521   queue->sinkresult = GST_FLOW_FLUSHING;
522   queue->is_eos = FALSE;
523   queue->in_timer = g_timer_new ();
524   queue->out_timer = g_timer_new ();
525
526   g_mutex_init (&queue->qlock);
527   queue->waiting_add = FALSE;
528   g_cond_init (&queue->item_add);
529   queue->waiting_del = FALSE;
530   g_cond_init (&queue->item_del);
531   g_queue_init (&queue->queue);
532
533   g_cond_init (&queue->query_handled);
534   queue->last_query = FALSE;
535
536   g_mutex_init (&queue->buffering_post_lock);
537   queue->buffering_percent = 100;
538
539   /* tempfile related */
540   queue->temp_template = NULL;
541   queue->temp_location = NULL;
542   queue->temp_remove = DEFAULT_TEMP_REMOVE;
543
544   queue->ring_buffer = NULL;
545   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
546
547   GST_DEBUG_OBJECT (queue,
548       "initialized queue's not_empty & not_full conditions");
549 }
550
551 /* called only once, as opposed to dispose */
552 static void
553 gst_queue2_finalize (GObject * object)
554 {
555   GstQueue2 *queue = GST_QUEUE2 (object);
556
557   GST_DEBUG_OBJECT (queue, "finalizing queue");
558
559   while (!g_queue_is_empty (&queue->queue)) {
560     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
561
562     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
563       gst_mini_object_unref (qitem->item);
564     g_slice_free (GstQueue2Item, qitem);
565   }
566
567   queue->last_query = FALSE;
568   g_queue_clear (&queue->queue);
569   g_mutex_clear (&queue->qlock);
570   g_mutex_clear (&queue->buffering_post_lock);
571   g_cond_clear (&queue->item_add);
572   g_cond_clear (&queue->item_del);
573   g_cond_clear (&queue->query_handled);
574   g_timer_destroy (queue->in_timer);
575   g_timer_destroy (queue->out_timer);
576
577   /* temp_file path cleanup  */
578   g_free (queue->temp_template);
579   g_free (queue->temp_location);
580
581   G_OBJECT_CLASS (parent_class)->finalize (object);
582 }
583
584 static void
585 debug_ranges (GstQueue2 * queue)
586 {
587   GstQueue2Range *walk;
588
589   for (walk = queue->ranges; walk; walk = walk->next) {
590     GST_DEBUG_OBJECT (queue,
591         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
592         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
593         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
594         walk->rb_writing_pos, walk->reading_pos,
595         walk == queue->current ? "**y**" : "  n  ");
596   }
597 }
598
599 /* clear all the downloaded ranges */
600 static void
601 clean_ranges (GstQueue2 * queue)
602 {
603   GST_DEBUG_OBJECT (queue, "clean queue ranges");
604
605   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
606   queue->ranges = NULL;
607   queue->current = NULL;
608 }
609
610 /* find a range that contains @offset or NULL when nothing does */
611 static GstQueue2Range *
612 find_range (GstQueue2 * queue, guint64 offset)
613 {
614   GstQueue2Range *range = NULL;
615   GstQueue2Range *walk;
616
617   /* first do a quick check for the current range */
618   for (walk = queue->ranges; walk; walk = walk->next) {
619     if (offset >= walk->offset && offset <= walk->writing_pos) {
620       /* we can reuse an existing range */
621       range = walk;
622       break;
623     }
624   }
625   if (range) {
626     GST_DEBUG_OBJECT (queue,
627         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
628         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
629   } else {
630     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
631   }
632   return range;
633 }
634
635 static void
636 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
637 {
638   guint64 max_reading_pos, writing_pos;
639
640   writing_pos = range->writing_pos;
641   max_reading_pos = range->max_reading_pos;
642
643   if (writing_pos > max_reading_pos)
644     queue->cur_level.bytes = writing_pos - max_reading_pos;
645   else
646     queue->cur_level.bytes = 0;
647 }
648
649 /* make a new range for @offset or reuse an existing range */
650 static GstQueue2Range *
651 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
652 {
653   GstQueue2Range *range, *prev, *next;
654
655   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
656
657   if ((range = find_range (queue, offset))) {
658     GST_DEBUG_OBJECT (queue,
659         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
660         range->writing_pos);
661     if (update_existing && range->writing_pos != offset) {
662       GST_DEBUG_OBJECT (queue, "updating range writing position to "
663           "%" G_GUINT64_FORMAT, offset);
664       range->writing_pos = offset;
665     }
666   } else {
667     GST_DEBUG_OBJECT (queue,
668         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
669
670     range = g_slice_new0 (GstQueue2Range);
671     range->offset = offset;
672     /* we want to write to the next location in the ring buffer */
673     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
674     range->writing_pos = offset;
675     range->rb_writing_pos = range->rb_offset;
676     range->reading_pos = offset;
677     range->max_reading_pos = offset;
678
679     /* insert sorted */
680     prev = NULL;
681     next = queue->ranges;
682     while (next) {
683       if (next->offset > offset) {
684         /* insert before next */
685         GST_DEBUG_OBJECT (queue,
686             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
687             next->offset);
688         break;
689       }
690       /* try next */
691       prev = next;
692       next = next->next;
693     }
694     range->next = next;
695     if (prev)
696       prev->next = range;
697     else
698       queue->ranges = range;
699   }
700   debug_ranges (queue);
701
702   /* update the stats for this range */
703   update_cur_level (queue, range);
704
705   return range;
706 }
707
708
709 /* clear and init the download ranges for offset 0 */
710 static void
711 init_ranges (GstQueue2 * queue)
712 {
713   GST_DEBUG_OBJECT (queue, "init queue ranges");
714
715   /* get rid of all the current ranges */
716   clean_ranges (queue);
717   /* make a range for offset 0 */
718   queue->current = add_range (queue, 0, TRUE);
719 }
720
721 /* calculate the diff between running time on the sink and src of the queue.
722  * This is the total amount of time in the queue. */
723 static void
724 update_time_level (GstQueue2 * queue)
725 {
726   if (queue->sink_tainted) {
727     queue->sinktime =
728         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
729         queue->sink_segment.position);
730     queue->sink_tainted = FALSE;
731   }
732
733   if (queue->src_tainted) {
734     queue->srctime =
735         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
736         queue->src_segment.position);
737     queue->src_tainted = FALSE;
738   }
739
740   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
741       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
742
743   if (queue->sinktime != GST_CLOCK_TIME_NONE
744       && queue->srctime != GST_CLOCK_TIME_NONE
745       && queue->sinktime >= queue->srctime)
746     queue->cur_level.time = queue->sinktime - queue->srctime;
747   else
748     queue->cur_level.time = 0;
749 }
750
751 /* take a SEGMENT event and apply the values to segment, updating the time
752  * level of queue. */
753 static void
754 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
755     gboolean is_sink)
756 {
757   gst_event_copy_segment (event, segment);
758
759   if (segment->format == GST_FORMAT_BYTES) {
760     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
761       /* start is where we'll be getting from and as such writing next */
762       queue->current = add_range (queue, segment->start, TRUE);
763     }
764   }
765
766   /* now configure the values, we use these to track timestamps on the
767    * sinkpad. */
768   if (segment->format != GST_FORMAT_TIME) {
769     /* non-time format, pretend the current time segment is closed with a
770      * 0 start and unknown stop time. */
771     segment->format = GST_FORMAT_TIME;
772     segment->start = 0;
773     segment->stop = -1;
774     segment->time = 0;
775   }
776
777   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
778
779   if (is_sink)
780     queue->sink_tainted = TRUE;
781   else
782     queue->src_tainted = TRUE;
783
784   /* segment can update the time level of the queue */
785   update_time_level (queue);
786 }
787
788 static void
789 apply_gap (GstQueue2 * queue, GstEvent * event,
790     GstSegment * segment, gboolean is_sink)
791 {
792   GstClockTime timestamp;
793   GstClockTime duration;
794
795   gst_event_parse_gap (event, &timestamp, &duration);
796
797   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
798
799     if (GST_CLOCK_TIME_IS_VALID (duration)) {
800       timestamp += duration;
801     }
802
803     segment->position = timestamp;
804
805     if (is_sink)
806       queue->sink_tainted = TRUE;
807     else
808       queue->src_tainted = TRUE;
809
810     /* calc diff with other end */
811     update_time_level (queue);
812   }
813 }
814
815 /* take a buffer and update segment, updating the time level of the queue. */
816 static void
817 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
818     guint64 size, gboolean is_sink)
819 {
820   GstClockTime duration, timestamp;
821
822   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
823   duration = GST_BUFFER_DURATION (buffer);
824
825   /* If we have no duration, pick one from the bitrate if we can */
826   if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
827     guint bitrate =
828         is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
829     if (bitrate)
830       duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
831   }
832
833   /* if no timestamp is set, assume it's continuous with the previous
834    * time */
835   if (timestamp == GST_CLOCK_TIME_NONE)
836     timestamp = segment->position;
837
838   /* add duration */
839   if (duration != GST_CLOCK_TIME_NONE)
840     timestamp += duration;
841
842   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
843       GST_TIME_ARGS (timestamp));
844
845   segment->position = timestamp;
846
847   if (is_sink)
848     queue->sink_tainted = TRUE;
849   else
850     queue->src_tainted = TRUE;
851
852   /* calc diff with other end */
853   update_time_level (queue);
854 }
855
856 struct BufListData
857 {
858   GstClockTime timestamp;
859   guint bitrate;
860 };
861
862 static gboolean
863 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
864 {
865   struct BufListData *bld = data;
866   GstClockTime *timestamp = &bld->timestamp;
867   GstClockTime btime;
868
869   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
870       " duration %" GST_TIME_FORMAT, idx,
871       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
872       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
873       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
874
875   btime = GST_BUFFER_DTS_OR_PTS (*buf);
876   if (GST_CLOCK_TIME_IS_VALID (btime))
877     *timestamp = btime;
878
879   if (GST_BUFFER_DURATION_IS_VALID (*buf))
880     *timestamp += GST_BUFFER_DURATION (*buf);
881   else if (bld->bitrate != 0) {
882     guint64 size = gst_buffer_get_size (*buf);
883
884     /* If we have no duration, pick one from the bitrate if we can */
885     *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
886   }
887
888
889   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
890   return TRUE;
891 }
892
893 /* take a buffer list and update segment, updating the time level of the queue */
894 static void
895 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
896     GstSegment * segment, gboolean is_sink)
897 {
898   struct BufListData bld;
899
900   /* if no timestamp is set, assume it's continuous with the previous time */
901   bld.timestamp = segment->position;
902
903   if (queue->use_tags_bitrate) {
904     if (is_sink)
905       bld.bitrate = queue->sink_tags_bitrate;
906     else
907       bld.bitrate = queue->src_tags_bitrate;
908   } else
909     bld.bitrate = 0;
910
911   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
912
913   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
914       GST_TIME_ARGS (bld.timestamp));
915
916   segment->position = bld.timestamp;
917
918   if (is_sink)
919     queue->sink_tainted = TRUE;
920   else
921     queue->src_tainted = TRUE;
922
923   /* calc diff with other end */
924   update_time_level (queue);
925 }
926
927 static inline gint
928 normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
929     guint64 alt_max)
930 {
931   guint64 p;
932
933   if (max_level == 0)
934     return 0;
935
936   if (alt_max > 0)
937     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
938         MIN (max_level, alt_max));
939   else
940     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
941
942   return MIN (p, MAX_BUFFERING_LEVEL);
943 }
944
945 static gboolean
946 get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
947     gint * buffering_level)
948 {
949   gint buflevel, buflevel2;
950
951   if (queue->high_watermark <= 0) {
952     if (buffering_level)
953       *buffering_level = MAX_BUFFERING_LEVEL;
954     if (is_buffering)
955       *is_buffering = FALSE;
956     return FALSE;
957   }
958 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
959     normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
960
961   if (queue->is_eos || queue->srcresult == GST_FLOW_NOT_LINKED) {
962     /* on EOS and NOT_LINKED we are always 100% full, we set the var
963      * here so that we can reuse the logic below to stop buffering */
964     buflevel = MAX_BUFFERING_LEVEL;
965     GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
966   } else {
967     GST_LOG_OBJECT (queue,
968         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
969         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
970         queue->cur_level.buffers);
971
972     /* figure out the buffering level we are filled, we take the max of all formats. */
973     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
974       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
975     } else {
976       guint64 rb_size = queue->ring_buffer_max_size;
977       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
978     }
979
980     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
981     buflevel = MAX (buflevel, buflevel2);
982
983     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
984     buflevel = MAX (buflevel, buflevel2);
985
986     /* also apply the rate estimate when we need to */
987     if (queue->use_rate_estimate) {
988       buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
989       buflevel = MAX (buflevel, buflevel2);
990     }
991
992     /* Don't get to 0% unless we're really empty */
993     if (queue->cur_level.bytes > 0)
994       buflevel = MAX (1, buflevel);
995   }
996 #undef GET_BUFFER_LEVEL_FOR_QUANTITY
997
998   if (is_buffering)
999     *is_buffering = queue->is_buffering;
1000
1001   if (buffering_level)
1002     *buffering_level = buflevel;
1003
1004   GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
1005       buflevel);
1006
1007   return TRUE;
1008 }
1009
1010 static gint
1011 convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
1012 {
1013   int percent;
1014
1015   /* scale so that if buffering_level equals the high watermark,
1016    * the percentage is 100% */
1017   percent = buffering_level * 100 / queue->high_watermark;
1018   /* clip */
1019   if (percent > 100)
1020     percent = 100;
1021
1022   return percent;
1023 }
1024
1025 static void
1026 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
1027     gint * avg_in, gint * avg_out, gint64 * buffering_left)
1028 {
1029   if (mode) {
1030     if (!QUEUE_IS_USING_QUEUE (queue)) {
1031       if (QUEUE_IS_USING_RING_BUFFER (queue))
1032         *mode = GST_BUFFERING_TIMESHIFT;
1033       else
1034         *mode = GST_BUFFERING_DOWNLOAD;
1035     } else {
1036       *mode = GST_BUFFERING_STREAM;
1037     }
1038   }
1039
1040   if (avg_in)
1041     *avg_in = queue->byte_in_rate;
1042   if (avg_out)
1043     *avg_out = queue->byte_out_rate;
1044
1045   if (buffering_left) {
1046     *buffering_left = (percent == 100 ? 0 : -1);
1047
1048     if (queue->use_rate_estimate) {
1049       guint64 max, cur;
1050
1051       max = queue->max_level.rate_time;
1052       cur = queue->cur_level.rate_time;
1053
1054       if (percent != 100 && max > cur)
1055         *buffering_left = (max - cur) / 1000000;
1056     }
1057   }
1058 }
1059
1060 /* Called with the lock taken */
1061 static GstMessage *
1062 gst_queue2_get_buffering_message (GstQueue2 * queue)
1063 {
1064   GstMessage *msg = NULL;
1065
1066   if (queue->percent_changed) {
1067     gint percent = queue->buffering_percent;
1068
1069     queue->percent_changed = FALSE;
1070
1071     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1072     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1073
1074     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1075         queue->avg_out, queue->buffering_left);
1076   }
1077
1078   return msg;
1079 }
1080
1081 static void
1082 gst_queue2_post_buffering (GstQueue2 * queue)
1083 {
1084   GstMessage *msg = NULL;
1085
1086   g_mutex_lock (&queue->buffering_post_lock);
1087   GST_QUEUE2_MUTEX_LOCK (queue);
1088   msg = gst_queue2_get_buffering_message (queue);
1089   GST_QUEUE2_MUTEX_UNLOCK (queue);
1090
1091   if (msg != NULL)
1092     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1093
1094   g_mutex_unlock (&queue->buffering_post_lock);
1095 }
1096
1097 static void
1098 update_buffering (GstQueue2 * queue)
1099 {
1100   gint buffering_level, percent;
1101
1102   /* Ensure the variables used to calculate buffering state are up-to-date. */
1103   if (queue->current)
1104     update_cur_level (queue, queue->current);
1105   update_in_rates (queue, FALSE);
1106
1107   if (!get_buffering_level (queue, NULL, &buffering_level))
1108     return;
1109
1110   percent = convert_to_buffering_percent (queue, buffering_level);
1111
1112   if (queue->is_buffering) {
1113     /* if we were buffering see if we reached the high watermark */
1114     if (percent >= 100)
1115       queue->is_buffering = FALSE;
1116
1117     SET_PERCENT (queue, percent);
1118   } else {
1119     /* we were not buffering, check if we need to start buffering if we drop
1120      * below the low threshold */
1121     if (buffering_level < queue->low_watermark) {
1122       queue->is_buffering = TRUE;
1123       SET_PERCENT (queue, percent);
1124     }
1125   }
1126 }
1127
1128 static void
1129 reset_rate_timer (GstQueue2 * queue)
1130 {
1131   queue->bytes_in = 0;
1132   queue->bytes_out = 0;
1133   queue->byte_in_rate = 0.0;
1134   queue->byte_in_period = 0;
1135   queue->byte_out_rate = 0.0;
1136   queue->last_update_in_rates_elapsed = 0.0;
1137   queue->last_in_elapsed = 0.0;
1138   queue->last_out_elapsed = 0.0;
1139   queue->in_timer_started = FALSE;
1140   queue->out_timer_started = FALSE;
1141 }
1142
1143 /* the interval in seconds to recalculate the rate */
1144 #define RATE_INTERVAL    0.2
1145 /* Tuning for rate estimation. We use a large window for the input rate because
1146  * it should be stable when connected to a network. The output rate is less
1147  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1148  * therefore adapt more quickly.
1149  * However, initial input rate may be subject to a burst, and should therefore
1150  * initially also adapt more quickly to changes, and only later on give higher
1151  * weight to previous values. */
1152 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1153 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1154
1155 static void
1156 update_in_rates (GstQueue2 * queue, gboolean force)
1157 {
1158   gdouble elapsed, period;
1159   gdouble byte_in_rate;
1160
1161   if (!queue->in_timer_started) {
1162     queue->in_timer_started = TRUE;
1163     g_timer_start (queue->in_timer);
1164     return;
1165   }
1166
1167   queue->last_update_in_rates_elapsed = elapsed =
1168       g_timer_elapsed (queue->in_timer, NULL);
1169
1170   /* recalc after each interval. */
1171   if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1172     period = elapsed - queue->last_in_elapsed;
1173
1174     GST_DEBUG_OBJECT (queue,
1175         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1176         period, queue->bytes_in, queue->byte_in_period);
1177
1178     byte_in_rate = queue->bytes_in / period;
1179
1180     if (queue->byte_in_rate == 0.0)
1181       queue->byte_in_rate = byte_in_rate;
1182     else
1183       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1184           (double) queue->byte_in_period, period);
1185
1186     /* another data point, cap at 16 for long time running average */
1187     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1188       queue->byte_in_period += period;
1189
1190     /* reset the values to calculate rate over the next interval */
1191     queue->last_in_elapsed = elapsed;
1192     queue->bytes_in = 0;
1193   }
1194
1195   if (queue->byte_in_rate > 0.0) {
1196     queue->cur_level.rate_time =
1197         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1198   }
1199   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1200       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1201 }
1202
1203 static void
1204 update_out_rates (GstQueue2 * queue)
1205 {
1206   gdouble elapsed, period;
1207   gdouble byte_out_rate;
1208
1209   if (!queue->out_timer_started) {
1210     queue->out_timer_started = TRUE;
1211     g_timer_start (queue->out_timer);
1212     return;
1213   }
1214
1215   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1216
1217   /* recalc after each interval. */
1218   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1219     period = elapsed - queue->last_out_elapsed;
1220
1221     GST_DEBUG_OBJECT (queue,
1222         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1223
1224     byte_out_rate = queue->bytes_out / period;
1225
1226     if (queue->byte_out_rate == 0.0)
1227       queue->byte_out_rate = byte_out_rate;
1228     else
1229       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1230
1231     /* reset the values to calculate rate over the next interval */
1232     queue->last_out_elapsed = elapsed;
1233     queue->bytes_out = 0;
1234   }
1235   if (queue->byte_in_rate > 0.0) {
1236     queue->cur_level.rate_time =
1237         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1238   }
1239   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1240       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1241 }
1242
1243 static void
1244 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1245 {
1246   guint64 reading_pos, max_reading_pos;
1247
1248   reading_pos = pos;
1249   max_reading_pos = range->max_reading_pos;
1250
1251   max_reading_pos = MAX (max_reading_pos, reading_pos);
1252
1253   GST_DEBUG_OBJECT (queue,
1254       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1255       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1256   range->max_reading_pos = max_reading_pos;
1257
1258   update_cur_level (queue, range);
1259 }
1260
1261 static gboolean
1262 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1263 {
1264   GstEvent *event;
1265   gboolean res;
1266
1267   /* until we receive the FLUSH_STOP from this seek, we skip data */
1268   queue->seeking = TRUE;
1269   GST_QUEUE2_MUTEX_UNLOCK (queue);
1270
1271   debug_ranges (queue);
1272
1273   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1274
1275   event =
1276       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1277       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1278       GST_SEEK_TYPE_NONE, -1);
1279
1280   res = gst_pad_push_event (queue->sinkpad, event);
1281   GST_QUEUE2_MUTEX_LOCK (queue);
1282
1283   if (res) {
1284     /* Between us sending the seek event and re-acquiring the lock, the source
1285      * thread might already have pushed data and moved along the range's
1286      * writing_pos beyond the seek offset. In that case we don't want to set
1287      * the writing position back to the requested seek position, as it would
1288      * cause data to be written to the wrong offset in the file or ring buffer.
1289      * We still do the add_range call to switch the current range to the
1290      * requested range, or create one if one doesn't exist yet. */
1291     queue->current = add_range (queue, offset, FALSE);
1292   }
1293
1294   return res;
1295 }
1296
1297 /* get the threshold for when we decide to seek rather than wait */
1298 static guint64
1299 get_seek_threshold (GstQueue2 * queue)
1300 {
1301   guint64 threshold;
1302
1303   /* FIXME, find a good threshold based on the incoming rate. */
1304   threshold = 1024 * 512;
1305
1306   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1307     threshold = MIN (threshold,
1308         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1309   }
1310   return threshold;
1311 }
1312
1313 /* see if there is enough data in the file to read a full buffer */
1314 static gboolean
1315 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1316 {
1317   GstQueue2Range *range;
1318
1319   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1320       offset, length);
1321
1322   if ((range = find_range (queue, offset))) {
1323     if (queue->current != range) {
1324       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1325       perform_seek_to_offset (queue, range->writing_pos);
1326     }
1327
1328     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1329         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1330
1331     /* we have a range for offset */
1332     GST_DEBUG_OBJECT (queue,
1333         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1334         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1335
1336     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1337       return TRUE;
1338
1339     if (offset + length <= range->writing_pos)
1340       return TRUE;
1341     else
1342       GST_DEBUG_OBJECT (queue,
1343           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1344           (offset + length) - range->writing_pos);
1345
1346   } else {
1347     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1348         " len %u", offset, length);
1349     /* we don't have the range, see how far away we are */
1350     if (!queue->is_eos && queue->current) {
1351       guint64 threshold = get_seek_threshold (queue);
1352
1353       if (offset >= queue->current->offset && offset <=
1354           queue->current->writing_pos + threshold) {
1355         GST_INFO_OBJECT (queue,
1356             "requested data is within range, wait for data");
1357         return FALSE;
1358       }
1359     }
1360
1361     /* too far away, do a seek */
1362     perform_seek_to_offset (queue, offset);
1363   }
1364
1365   return FALSE;
1366 }
1367
1368 #ifdef HAVE_FSEEKO
1369 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1370 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1371 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1372 #else
1373 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1374 #endif
1375
1376 static GstFlowReturn
1377 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1378     guint8 * dst, gint64 * read_return)
1379 {
1380   guint8 *ring_buffer;
1381   size_t res;
1382
1383   ring_buffer = queue->ring_buffer;
1384
1385   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1386     goto seek_failed;
1387
1388   /* this should not block */
1389   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1390       length, offset);
1391   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1392     res = fread (dst, 1, length, queue->temp_file);
1393   } else {
1394     memcpy (dst, ring_buffer + offset, length);
1395     res = length;
1396   }
1397
1398   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1399
1400   if (G_UNLIKELY (res < length)) {
1401     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1402       goto could_not_read;
1403     /* check for errors or EOF */
1404     if (ferror (queue->temp_file))
1405       goto could_not_read;
1406     if (feof (queue->temp_file) && length > 0)
1407       goto eos;
1408   }
1409
1410   *read_return = res;
1411
1412   return GST_FLOW_OK;
1413
1414 seek_failed:
1415   {
1416     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1417     return GST_FLOW_ERROR;
1418   }
1419 could_not_read:
1420   {
1421     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1422     return GST_FLOW_ERROR;
1423   }
1424 eos:
1425   {
1426     GST_DEBUG ("non-regular file hits EOS");
1427     return GST_FLOW_EOS;
1428   }
1429 }
1430
1431 static GstFlowReturn
1432 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1433     GstBuffer ** buffer)
1434 {
1435   GstBuffer *buf;
1436   GstMapInfo info;
1437   guint8 *data;
1438   guint64 file_offset;
1439   guint block_length, remaining, read_length;
1440   guint64 rb_size;
1441   guint64 max_size;
1442   guint64 rpos;
1443   GstFlowReturn ret = GST_FLOW_OK;
1444
1445   /* allocate the output buffer of the requested size */
1446   if (*buffer == NULL)
1447     buf = gst_buffer_new_allocate (NULL, length, NULL);
1448   else
1449     buf = *buffer;
1450
1451   if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1452     goto buffer_write_fail;
1453   data = info.data;
1454
1455   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1456       offset);
1457
1458   rpos = offset;
1459   rb_size = queue->ring_buffer_max_size;
1460   max_size = QUEUE_MAX_BYTES (queue);
1461
1462   remaining = length;
1463   while (remaining > 0) {
1464     /* configure how much/whether to read */
1465     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1466       read_length = 0;
1467
1468       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1469         guint64 level;
1470
1471         /* calculate how far away the offset is */
1472         if (queue->current->writing_pos > rpos)
1473           level = queue->current->writing_pos - rpos;
1474         else
1475           level = 0;
1476
1477         GST_DEBUG_OBJECT (queue,
1478             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1479             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1480             rpos, queue->current->writing_pos, level, max_size);
1481
1482         if (level >= max_size) {
1483           /* we don't have the data but if we have a ring buffer that is full, we
1484            * need to read */
1485           GST_DEBUG_OBJECT (queue,
1486               "ring buffer full, reading QUEUE_MAX_BYTES %"
1487               G_GUINT64_FORMAT " bytes", max_size);
1488           read_length = max_size;
1489         } else if (queue->is_eos) {
1490           /* won't get any more data so read any data we have */
1491           if (level) {
1492             GST_DEBUG_OBJECT (queue,
1493                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1494                 level);
1495             read_length = level;
1496             remaining = level;
1497             length = level;
1498           } else
1499             goto hit_eos;
1500         }
1501       }
1502
1503       if (read_length == 0) {
1504         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1505           GST_DEBUG_OBJECT (queue,
1506               "update current position [%" G_GUINT64_FORMAT "-%"
1507               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1508           update_cur_pos (queue, queue->current, rpos);
1509           GST_QUEUE2_SIGNAL_DEL (queue);
1510         }
1511
1512         if (queue->use_buffering)
1513           update_buffering (queue);
1514
1515         GST_DEBUG_OBJECT (queue, "waiting for add");
1516         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1517         continue;
1518       }
1519     } else {
1520       /* we have the requested data so read it */
1521       read_length = remaining;
1522     }
1523
1524     /* set range reading_pos to actual reading position for this read */
1525     queue->current->reading_pos = rpos;
1526
1527     /* configure how much and from where to read */
1528     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1529       file_offset =
1530           (queue->current->rb_offset + (rpos -
1531               queue->current->offset)) % rb_size;
1532       if (file_offset + read_length > rb_size) {
1533         block_length = rb_size - file_offset;
1534       } else {
1535         block_length = read_length;
1536       }
1537     } else {
1538       file_offset = rpos;
1539       block_length = read_length;
1540     }
1541
1542     /* while we still have data to read, we loop */
1543     while (read_length > 0) {
1544       gint64 read_return;
1545
1546       ret =
1547           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1548           data, &read_return);
1549       if (ret != GST_FLOW_OK)
1550         goto read_error;
1551
1552       file_offset += read_return;
1553       if (QUEUE_IS_USING_RING_BUFFER (queue))
1554         file_offset %= rb_size;
1555
1556       data += read_return;
1557       read_length -= read_return;
1558       block_length = read_length;
1559       remaining -= read_return;
1560
1561       rpos = (queue->current->reading_pos += read_return);
1562       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1563     }
1564     GST_QUEUE2_SIGNAL_DEL (queue);
1565     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1566   }
1567
1568   gst_buffer_unmap (buf, &info);
1569   gst_buffer_resize (buf, 0, length);
1570
1571   GST_BUFFER_OFFSET (buf) = offset;
1572   GST_BUFFER_OFFSET_END (buf) = offset + length;
1573
1574   *buffer = buf;
1575
1576   return ret;
1577
1578   /* ERRORS */
1579 hit_eos:
1580   {
1581     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1582     gst_buffer_unmap (buf, &info);
1583     if (*buffer == NULL)
1584       gst_buffer_unref (buf);
1585     return GST_FLOW_EOS;
1586   }
1587 out_flushing:
1588   {
1589     GST_DEBUG_OBJECT (queue, "we are flushing");
1590     gst_buffer_unmap (buf, &info);
1591     if (*buffer == NULL)
1592       gst_buffer_unref (buf);
1593     return GST_FLOW_FLUSHING;
1594   }
1595 read_error:
1596   {
1597     GST_DEBUG_OBJECT (queue, "we have a read error");
1598     gst_buffer_unmap (buf, &info);
1599     if (*buffer == NULL)
1600       gst_buffer_unref (buf);
1601     return ret;
1602   }
1603 buffer_write_fail:
1604   {
1605     GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1606         ("Can't write to buffer"));
1607     if (*buffer == NULL)
1608       gst_buffer_unref (buf);
1609     return GST_FLOW_ERROR;
1610   }
1611 }
1612
1613 /* should be called with QUEUE_LOCK */
1614 static GstMiniObject *
1615 gst_queue2_read_item_from_file (GstQueue2 * queue)
1616 {
1617   GstMiniObject *item;
1618
1619   if (queue->stream_start_event != NULL) {
1620     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1621     queue->stream_start_event = NULL;
1622   } else if (queue->starting_segment != NULL) {
1623     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1624     queue->starting_segment = NULL;
1625   } else {
1626     GstFlowReturn ret;
1627     GstBuffer *buffer = NULL;
1628     guint64 reading_pos;
1629
1630     reading_pos = queue->current->reading_pos;
1631
1632     ret =
1633         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1634         &buffer);
1635
1636     switch (ret) {
1637       case GST_FLOW_OK:
1638         item = GST_MINI_OBJECT_CAST (buffer);
1639         break;
1640       case GST_FLOW_EOS:
1641         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1642         break;
1643       default:
1644         item = NULL;
1645         break;
1646     }
1647   }
1648   return item;
1649 }
1650
1651 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1652  * the temp filename. */
1653 static gboolean
1654 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1655 {
1656   gint fd = -1;
1657   gchar *name = NULL;
1658
1659   if (queue->temp_file)
1660     goto already_opened;
1661
1662   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1663
1664   /* If temp_template was set, allocate a filename and open that file */
1665
1666   /* nothing to do */
1667   if (queue->temp_template == NULL)
1668     goto no_directory;
1669
1670   /* make copy of the template, we don't want to change this */
1671   name = g_strdup (queue->temp_template);
1672
1673 #ifdef __BIONIC__
1674   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1675 #else
1676   fd = g_mkstemp (name);
1677 #endif
1678
1679   if (fd == -1)
1680     goto mkstemp_failed;
1681
1682   /* open the file for update/writing */
1683   queue->temp_file = fdopen (fd, "wb+");
1684   /* error creating file */
1685   if (queue->temp_file == NULL)
1686     goto open_failed;
1687
1688   g_free (queue->temp_location);
1689   queue->temp_location = name;
1690
1691   GST_QUEUE2_MUTEX_UNLOCK (queue);
1692
1693   /* we can't emit the notify with the lock */
1694   g_object_notify (G_OBJECT (queue), "temp-location");
1695
1696   GST_QUEUE2_MUTEX_LOCK (queue);
1697
1698   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1699
1700   return TRUE;
1701
1702   /* ERRORS */
1703 already_opened:
1704   {
1705     GST_DEBUG_OBJECT (queue, "temp file was already open");
1706     return TRUE;
1707   }
1708 no_directory:
1709   {
1710     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1711         (_("No Temp directory specified.")), (NULL));
1712     return FALSE;
1713   }
1714 mkstemp_failed:
1715   {
1716     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1717         (_("Could not create temp file \"%s\"."), queue->temp_template),
1718         GST_ERROR_SYSTEM);
1719     g_free (name);
1720     return FALSE;
1721   }
1722 open_failed:
1723   {
1724     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1725         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1726     g_free (name);
1727     if (fd != -1)
1728       close (fd);
1729     return FALSE;
1730   }
1731 }
1732
1733 static void
1734 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1735 {
1736   /* nothing to do */
1737   if (queue->temp_file == NULL)
1738     return;
1739
1740   GST_DEBUG_OBJECT (queue, "closing temp file");
1741
1742   fflush (queue->temp_file);
1743   fclose (queue->temp_file);
1744
1745   if (queue->temp_remove) {
1746     if (remove (queue->temp_location) < 0) {
1747       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1748           queue->temp_location, g_strerror (errno));
1749     }
1750   }
1751
1752   queue->temp_file = NULL;
1753   clean_ranges (queue);
1754 }
1755
1756 static void
1757 gst_queue2_flush_temp_file (GstQueue2 * queue)
1758 {
1759   if (queue->temp_file == NULL)
1760     return;
1761
1762   GST_DEBUG_OBJECT (queue, "flushing temp file");
1763
1764   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1765 }
1766
1767 static void
1768 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1769 {
1770   if (!QUEUE_IS_USING_QUEUE (queue)) {
1771     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1772       gst_queue2_flush_temp_file (queue);
1773     init_ranges (queue);
1774   } else {
1775     while (!g_queue_is_empty (&queue->queue)) {
1776       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1777
1778       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1779           && GST_EVENT_IS_STICKY (qitem->item)
1780           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1781           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1782         gst_pad_store_sticky_event (queue->srcpad,
1783             GST_EVENT_CAST (qitem->item));
1784       }
1785
1786       /* Then lose another reference because we are supposed to destroy that
1787          data when flushing */
1788       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1789         gst_mini_object_unref (qitem->item);
1790       g_slice_free (GstQueue2Item, qitem);
1791     }
1792   }
1793   queue->last_query = FALSE;
1794   g_cond_signal (&queue->query_handled);
1795   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1796   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1797   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1798   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1799   queue->sink_tainted = queue->src_tainted = TRUE;
1800   if (queue->starting_segment != NULL)
1801     gst_event_unref (queue->starting_segment);
1802   queue->starting_segment = NULL;
1803   queue->segment_event_received = FALSE;
1804   gst_event_replace (&queue->stream_start_event, NULL);
1805
1806   /* we deleted a lot of something */
1807   GST_QUEUE2_SIGNAL_DEL (queue);
1808 }
1809
1810 static gboolean
1811 gst_queue2_wait_free_space (GstQueue2 * queue)
1812 {
1813   /* We make space available if we're "full" according to whatever
1814    * the user defined as "full". */
1815   if (gst_queue2_is_filled (queue)) {
1816     gboolean started;
1817
1818     /* pause the timer while we wait. The fact that we are waiting does not mean
1819      * the byterate on the input pad is lower */
1820     if ((started = queue->in_timer_started))
1821       g_timer_stop (queue->in_timer);
1822
1823     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1824         "queue is full, waiting for free space");
1825     do {
1826       /* Wait for space to be available, we could be unlocked because of a flush. */
1827       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1828     }
1829     while (gst_queue2_is_filled (queue));
1830
1831     /* and continue if we were running before */
1832     if (started)
1833       g_timer_continue (queue->in_timer);
1834   }
1835   return TRUE;
1836
1837   /* ERRORS */
1838 out_flushing:
1839   {
1840     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1841     return FALSE;
1842   }
1843 }
1844
1845 static gboolean
1846 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1847 {
1848   GstMapInfo info;
1849   guint8 *data, *ring_buffer;
1850   guint size, rb_size;
1851   guint64 writing_pos, new_writing_pos;
1852   GstQueue2Range *range, *prev, *next;
1853   gboolean do_seek = FALSE;
1854
1855   if (QUEUE_IS_USING_RING_BUFFER (queue))
1856     writing_pos = queue->current->rb_writing_pos;
1857   else
1858     writing_pos = queue->current->writing_pos;
1859   ring_buffer = queue->ring_buffer;
1860   rb_size = queue->ring_buffer_max_size;
1861
1862   if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
1863     goto buffer_read_error;
1864
1865   size = info.size;
1866   data = info.data;
1867
1868   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1869       writing_pos);
1870
1871   /* sanity check */
1872   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1873       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1874     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1875         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1876         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1877   }
1878
1879   while (size > 0) {
1880     guint to_write;
1881
1882     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1883       gint64 space;
1884
1885       /* calculate the space in the ring buffer not used by data from
1886        * the current range */
1887       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1888         /* wait until there is some free space */
1889         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1890       }
1891       /* get the amount of space we have */
1892       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1893
1894       /* calculate if we need to split or if we can write the entire
1895        * buffer now */
1896       to_write = MIN (size, space);
1897
1898       /* the writing position in the ring buffer after writing (part
1899        * or all of) the buffer */
1900       new_writing_pos = (writing_pos + to_write) % rb_size;
1901
1902       prev = NULL;
1903       range = queue->ranges;
1904
1905       /* if we need to overwrite data in the ring buffer, we need to
1906        * update the ranges
1907        *
1908        * warning: this code is complicated and includes some
1909        * simplifications - pen, paper and diagrams for the cases
1910        * recommended! */
1911       while (range) {
1912         guint64 range_data_start, range_data_end;
1913         GstQueue2Range *range_to_destroy = NULL;
1914
1915         if (range == queue->current)
1916           goto next_range;
1917
1918         range_data_start = range->rb_offset;
1919         range_data_end = range->rb_writing_pos;
1920
1921         /* handle the special case where the range has no data in it */
1922         if (range->writing_pos == range->offset) {
1923           if (range != queue->current) {
1924             GST_DEBUG_OBJECT (queue,
1925                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1926                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1927             /* remove range */
1928             range_to_destroy = range;
1929             if (prev)
1930               prev->next = range->next;
1931           }
1932           goto next_range;
1933         }
1934
1935         if (range_data_end > range_data_start) {
1936           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1937             goto next_range;
1938
1939           if (new_writing_pos > range_data_start) {
1940             if (new_writing_pos >= range_data_end) {
1941               GST_DEBUG_OBJECT (queue,
1942                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1943                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1944               /* remove range */
1945               range_to_destroy = range;
1946               if (prev)
1947                 prev->next = range->next;
1948             } else {
1949               GST_DEBUG_OBJECT (queue,
1950                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1951                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1952                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1953                   range->offset + new_writing_pos - range_data_start,
1954                   new_writing_pos);
1955               range->offset += (new_writing_pos - range_data_start);
1956               range->rb_offset = new_writing_pos;
1957             }
1958           }
1959         } else {
1960           guint64 new_wpos_virt = writing_pos + to_write;
1961
1962           if (new_wpos_virt <= range_data_start)
1963             goto next_range;
1964
1965           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1966             GST_DEBUG_OBJECT (queue,
1967                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1968                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1969             /* remove range */
1970             range_to_destroy = range;
1971             if (prev)
1972               prev->next = range->next;
1973           } else {
1974             GST_DEBUG_OBJECT (queue,
1975                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1976                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1977                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1978                 range->offset + new_writing_pos - range_data_start,
1979                 new_writing_pos);
1980             range->offset += (new_wpos_virt - range_data_start);
1981             range->rb_offset = new_writing_pos;
1982           }
1983         }
1984
1985       next_range:
1986         if (!range_to_destroy)
1987           prev = range;
1988
1989         range = range->next;
1990         if (range_to_destroy) {
1991           if (range_to_destroy == queue->ranges)
1992             queue->ranges = range;
1993           g_slice_free (GstQueue2Range, range_to_destroy);
1994           range_to_destroy = NULL;
1995         }
1996       }
1997     } else {
1998       to_write = size;
1999       new_writing_pos = writing_pos + to_write;
2000     }
2001
2002     if (QUEUE_IS_USING_TEMP_FILE (queue)
2003         && FSEEK_FILE (queue->temp_file, writing_pos))
2004       goto seek_failed;
2005
2006     if (new_writing_pos > writing_pos) {
2007       GST_INFO_OBJECT (queue,
2008           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
2009           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
2010           queue->current->writing_pos, queue->current->rb_writing_pos);
2011       /* either not using ring buffer or no wrapping, just write */
2012       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2013         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
2014           goto handle_error;
2015       } else {
2016         memcpy (ring_buffer + writing_pos, data, to_write);
2017       }
2018
2019       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
2020         /* try to merge with next range */
2021         while ((next = queue->current->next)) {
2022           GST_INFO_OBJECT (queue,
2023               "checking merge with next range %" G_GUINT64_FORMAT " < %"
2024               G_GUINT64_FORMAT, new_writing_pos, next->offset);
2025           if (new_writing_pos < next->offset)
2026             break;
2027
2028           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
2029               next->writing_pos);
2030
2031           /* remove the group */
2032           queue->current->next = next->next;
2033
2034           /* We use the threshold to decide if we want to do a seek or simply
2035            * read the data again. If there is not so much data in the range we
2036            * prefer to avoid to seek and read it again. */
2037           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
2038             /* the new range had more data than the threshold, it's worth keeping
2039              * it and doing a seek. */
2040             new_writing_pos = next->writing_pos;
2041             do_seek = TRUE;
2042           }
2043           g_slice_free (GstQueue2Range, next);
2044         }
2045         goto update_and_signal;
2046       }
2047     } else {
2048       /* wrapping */
2049       guint block_one, block_two;
2050
2051       block_one = rb_size - writing_pos;
2052       block_two = to_write - block_one;
2053
2054       if (block_one > 0) {
2055         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2056         /* write data to end of ring buffer */
2057         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2058           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2059             goto handle_error;
2060         } else {
2061           memcpy (ring_buffer + writing_pos, data, block_one);
2062         }
2063       }
2064
2065       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2066         goto seek_failed;
2067
2068       if (block_two > 0) {
2069         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2070         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2071           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2072             goto handle_error;
2073         } else {
2074           memcpy (ring_buffer, data + block_one, block_two);
2075         }
2076       }
2077     }
2078
2079   update_and_signal:
2080     /* update the writing positions */
2081     size -= to_write;
2082     GST_INFO_OBJECT (queue,
2083         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2084         to_write, writing_pos, size);
2085
2086     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2087       data += to_write;
2088       queue->current->writing_pos += to_write;
2089       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2090     } else {
2091       queue->current->writing_pos = writing_pos = new_writing_pos;
2092     }
2093     if (do_seek)
2094       perform_seek_to_offset (queue, new_writing_pos);
2095
2096     update_cur_level (queue, queue->current);
2097
2098     /* update the buffering status */
2099     if (queue->use_buffering) {
2100       GstMessage *msg;
2101       update_buffering (queue);
2102       msg = gst_queue2_get_buffering_message (queue);
2103       if (msg) {
2104         GST_QUEUE2_MUTEX_UNLOCK (queue);
2105         g_mutex_lock (&queue->buffering_post_lock);
2106         gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2107         g_mutex_unlock (&queue->buffering_post_lock);
2108         GST_QUEUE2_MUTEX_LOCK (queue);
2109       }
2110     }
2111
2112     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2113         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2114
2115     GST_QUEUE2_SIGNAL_ADD (queue);
2116   }
2117
2118   gst_buffer_unmap (buffer, &info);
2119
2120   return TRUE;
2121
2122   /* ERRORS */
2123 out_flushing:
2124   {
2125     GST_DEBUG_OBJECT (queue, "we are flushing");
2126     gst_buffer_unmap (buffer, &info);
2127     /* FIXME - GST_FLOW_EOS ? */
2128     return FALSE;
2129   }
2130 seek_failed:
2131   {
2132     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2133     gst_buffer_unmap (buffer, &info);
2134     return FALSE;
2135   }
2136 handle_error:
2137   {
2138     switch (errno) {
2139       case ENOSPC:{
2140         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2141         break;
2142       }
2143       default:{
2144         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2145             (_("Error while writing to download file.")),
2146             ("%s", g_strerror (errno)));
2147       }
2148     }
2149     gst_buffer_unmap (buffer, &info);
2150     return FALSE;
2151   }
2152 buffer_read_error:
2153   {
2154     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2155         ("Can't read from buffer"));
2156     return FALSE;
2157   }
2158 }
2159
2160 static gboolean
2161 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2162 {
2163   GstQueue2 *queue = q;
2164
2165   GST_TRACE_OBJECT (queue,
2166       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2167       gst_buffer_get_size (*buf));
2168
2169   if (!gst_queue2_create_write (queue, *buf)) {
2170     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2171     return FALSE;
2172   }
2173   return TRUE;
2174 }
2175
2176 static gboolean
2177 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2178 {
2179   guint *p_size = data;
2180   gsize buf_size;
2181
2182   buf_size = gst_buffer_get_size (*buf);
2183   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2184   *p_size += buf_size;
2185   return TRUE;
2186 }
2187
2188 /* enqueue an item an update the level stats */
2189 static void
2190 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2191     GstQueue2ItemType item_type)
2192 {
2193   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2194     GstBuffer *buffer;
2195     guint size;
2196
2197     buffer = GST_BUFFER_CAST (item);
2198     size = gst_buffer_get_size (buffer);
2199
2200     /* add buffer to the statistics */
2201     if (QUEUE_IS_USING_QUEUE (queue)) {
2202       queue->cur_level.buffers++;
2203       queue->cur_level.bytes += size;
2204     }
2205     queue->bytes_in += size;
2206
2207     /* apply new buffer to segment stats */
2208     apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2209     /* update the byterate stats */
2210     update_in_rates (queue, FALSE);
2211
2212     if (!QUEUE_IS_USING_QUEUE (queue)) {
2213       /* FIXME - check return value? */
2214       gst_queue2_create_write (queue, buffer);
2215     }
2216   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2217     GstBufferList *buffer_list;
2218     guint size = 0;
2219
2220     buffer_list = GST_BUFFER_LIST_CAST (item);
2221
2222     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2223     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2224
2225     /* add buffer to the statistics */
2226     if (QUEUE_IS_USING_QUEUE (queue)) {
2227       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2228       queue->cur_level.bytes += size;
2229     }
2230     queue->bytes_in += size;
2231
2232     /* apply new buffer to segment stats */
2233     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2234
2235     /* update the byterate stats */
2236     update_in_rates (queue, FALSE);
2237
2238     if (!QUEUE_IS_USING_QUEUE (queue)) {
2239       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2240     }
2241   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2242     GstEvent *event;
2243
2244     event = GST_EVENT_CAST (item);
2245
2246     switch (GST_EVENT_TYPE (event)) {
2247       case GST_EVENT_EOS:
2248         /* Zero the thresholds, this makes sure the queue is completely
2249          * filled and we can read all data from the queue. */
2250         GST_DEBUG_OBJECT (queue, "we have EOS");
2251         queue->is_eos = TRUE;
2252         /* Force updating the input bitrate */
2253         update_in_rates (queue, TRUE);
2254         break;
2255       case GST_EVENT_SEGMENT:
2256         apply_segment (queue, event, &queue->sink_segment, TRUE);
2257         /* This is our first new segment, we hold it
2258          * as we can't save it on the temp file */
2259         if (!QUEUE_IS_USING_QUEUE (queue)) {
2260           if (queue->segment_event_received)
2261             goto unexpected_event;
2262
2263           queue->segment_event_received = TRUE;
2264           if (queue->starting_segment != NULL)
2265             gst_event_unref (queue->starting_segment);
2266           queue->starting_segment = event;
2267           item = NULL;
2268         }
2269         /* a new segment allows us to accept more buffers if we got EOS
2270          * from downstream */
2271         queue->unexpected = FALSE;
2272         break;
2273       case GST_EVENT_GAP:
2274         apply_gap (queue, event, &queue->sink_segment, TRUE);
2275         break;
2276       case GST_EVENT_STREAM_START:
2277         if (!QUEUE_IS_USING_QUEUE (queue)) {
2278           gst_event_replace (&queue->stream_start_event, event);
2279           gst_event_unref (event);
2280           item = NULL;
2281         }
2282         break;
2283       case GST_EVENT_CAPS:{
2284         GstCaps *caps;
2285
2286         gst_event_parse_caps (event, &caps);
2287         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2288
2289         if (!QUEUE_IS_USING_QUEUE (queue)) {
2290           GST_LOG ("Dropping caps event, not using queue");
2291           gst_event_unref (event);
2292           item = NULL;
2293         }
2294         break;
2295       }
2296       default:
2297         if (!QUEUE_IS_USING_QUEUE (queue))
2298           goto unexpected_event;
2299         break;
2300     }
2301   } else if (GST_IS_QUERY (item)) {
2302     /* Can't happen as we check that in the caller */
2303     if (!QUEUE_IS_USING_QUEUE (queue))
2304       g_assert_not_reached ();
2305   } else {
2306     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2307         item, GST_OBJECT_NAME (queue));
2308     /* we can't really unref since we don't know what it is */
2309     item = NULL;
2310   }
2311
2312   if (item) {
2313     /* update the buffering status */
2314     if (queue->use_buffering)
2315       update_buffering (queue);
2316
2317     if (QUEUE_IS_USING_QUEUE (queue)) {
2318       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2319       qitem->type = item_type;
2320       qitem->item = item;
2321       g_queue_push_tail (&queue->queue, qitem);
2322     } else {
2323       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2324     }
2325
2326     GST_QUEUE2_SIGNAL_ADD (queue);
2327   }
2328
2329   return;
2330
2331   /* ERRORS */
2332 unexpected_event:
2333   {
2334     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2335
2336     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2337         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2338         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2339     gst_event_unref (GST_EVENT_CAST (item));
2340     return;
2341   }
2342 }
2343
2344 /* dequeue an item from the queue and update level stats */
2345 static GstMiniObject *
2346 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2347 {
2348   GstMiniObject *item;
2349
2350   if (!QUEUE_IS_USING_QUEUE (queue)) {
2351     item = gst_queue2_read_item_from_file (queue);
2352   } else {
2353     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2354
2355     if (qitem == NULL)
2356       goto no_item;
2357
2358     item = qitem->item;
2359     g_slice_free (GstQueue2Item, qitem);
2360   }
2361
2362   if (item == NULL)
2363     goto no_item;
2364
2365   if (GST_IS_BUFFER (item)) {
2366     GstBuffer *buffer;
2367     guint size;
2368
2369     buffer = GST_BUFFER_CAST (item);
2370     size = gst_buffer_get_size (buffer);
2371     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2372
2373     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2374         "retrieved buffer %p from queue", buffer);
2375
2376     if (QUEUE_IS_USING_QUEUE (queue)) {
2377       queue->cur_level.buffers--;
2378       queue->cur_level.bytes -= size;
2379     }
2380     queue->bytes_out += size;
2381
2382     apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2383     /* update the byterate stats */
2384     update_out_rates (queue);
2385     /* update the buffering */
2386     if (queue->use_buffering)
2387       update_buffering (queue);
2388
2389   } else if (GST_IS_EVENT (item)) {
2390     GstEvent *event = GST_EVENT_CAST (item);
2391
2392     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2393
2394     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2395         "retrieved event %p from queue", event);
2396
2397     switch (GST_EVENT_TYPE (event)) {
2398       case GST_EVENT_EOS:
2399         /* queue is empty now that we dequeued the EOS */
2400         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2401         break;
2402       case GST_EVENT_SEGMENT:
2403         apply_segment (queue, event, &queue->src_segment, FALSE);
2404         break;
2405       case GST_EVENT_GAP:
2406         apply_gap (queue, event, &queue->src_segment, FALSE);
2407         break;
2408       default:
2409         break;
2410     }
2411   } else if (GST_IS_BUFFER_LIST (item)) {
2412     GstBufferList *buffer_list;
2413     guint size = 0;
2414
2415     buffer_list = GST_BUFFER_LIST_CAST (item);
2416     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2417     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2418
2419     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2420         "retrieved buffer list %p from queue", buffer_list);
2421
2422     if (QUEUE_IS_USING_QUEUE (queue)) {
2423       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2424       queue->cur_level.bytes -= size;
2425     }
2426     queue->bytes_out += size;
2427
2428     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2429     /* update the byterate stats */
2430     update_out_rates (queue);
2431     /* update the buffering */
2432     if (queue->use_buffering)
2433       update_buffering (queue);
2434   } else if (GST_IS_QUERY (item)) {
2435     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2436         "retrieved query %p from queue", item);
2437     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2438   } else {
2439     g_warning
2440         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2441         item, GST_OBJECT_NAME (queue));
2442     item = NULL;
2443     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2444   }
2445   GST_QUEUE2_SIGNAL_DEL (queue);
2446
2447   return item;
2448
2449   /* ERRORS */
2450 no_item:
2451   {
2452     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2453     return NULL;
2454   }
2455 }
2456
2457 static GstFlowReturn
2458 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2459     GstEvent * event)
2460 {
2461   gboolean ret = TRUE;
2462   GstQueue2 *queue;
2463
2464   queue = GST_QUEUE2 (parent);
2465
2466   switch (GST_EVENT_TYPE (event)) {
2467     case GST_EVENT_FLUSH_START:
2468     {
2469       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2470       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2471         /* forward event */
2472         ret = gst_pad_push_event (queue->srcpad, event);
2473
2474         /* now unblock the chain function */
2475         GST_QUEUE2_MUTEX_LOCK (queue);
2476         queue->srcresult = GST_FLOW_FLUSHING;
2477         queue->sinkresult = GST_FLOW_FLUSHING;
2478         /* unblock the loop and chain functions */
2479         GST_QUEUE2_SIGNAL_ADD (queue);
2480         GST_QUEUE2_SIGNAL_DEL (queue);
2481         GST_QUEUE2_MUTEX_UNLOCK (queue);
2482
2483         /* make sure it pauses, this should happen since we sent
2484          * flush_start downstream. */
2485         gst_pad_pause_task (queue->srcpad);
2486         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2487
2488         GST_QUEUE2_MUTEX_LOCK (queue);
2489         queue->last_query = FALSE;
2490         g_cond_signal (&queue->query_handled);
2491         GST_QUEUE2_MUTEX_UNLOCK (queue);
2492       } else {
2493         GST_QUEUE2_MUTEX_LOCK (queue);
2494         /* flush the sink pad */
2495         queue->sinkresult = GST_FLOW_FLUSHING;
2496         GST_QUEUE2_SIGNAL_DEL (queue);
2497         queue->last_query = FALSE;
2498         g_cond_signal (&queue->query_handled);
2499         GST_QUEUE2_MUTEX_UNLOCK (queue);
2500
2501         gst_event_unref (event);
2502       }
2503       break;
2504     }
2505     case GST_EVENT_FLUSH_STOP:
2506     {
2507       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2508
2509       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2510         /* forward event */
2511         ret = gst_pad_push_event (queue->srcpad, event);
2512
2513         GST_QUEUE2_MUTEX_LOCK (queue);
2514         gst_queue2_locked_flush (queue, FALSE, TRUE);
2515         queue->srcresult = GST_FLOW_OK;
2516         queue->sinkresult = GST_FLOW_OK;
2517         queue->is_eos = FALSE;
2518         queue->unexpected = FALSE;
2519         queue->seeking = FALSE;
2520         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2521         /* reset rate counters */
2522         reset_rate_timer (queue);
2523         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2524             queue->srcpad, NULL);
2525         GST_QUEUE2_MUTEX_UNLOCK (queue);
2526       } else {
2527         GST_QUEUE2_MUTEX_LOCK (queue);
2528         queue->segment_event_received = FALSE;
2529         queue->is_eos = FALSE;
2530         queue->unexpected = FALSE;
2531         queue->sinkresult = GST_FLOW_OK;
2532         queue->seeking = FALSE;
2533         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2534         GST_QUEUE2_MUTEX_UNLOCK (queue);
2535
2536         gst_event_unref (event);
2537       }
2538       break;
2539     }
2540     case GST_EVENT_TAG:{
2541       if (queue->use_tags_bitrate) {
2542         GstTagList *tags;
2543         guint bitrate;
2544
2545         gst_event_parse_tag (event, &tags);
2546         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2547             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2548           GST_QUEUE2_MUTEX_LOCK (queue);
2549           queue->sink_tags_bitrate = bitrate;
2550           GST_QUEUE2_MUTEX_UNLOCK (queue);
2551           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2552         }
2553       }
2554       /* Fall-through */
2555     }
2556     default:
2557       if (GST_EVENT_IS_SERIALIZED (event)) {
2558         /* serialized events go in the queue */
2559         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2560         if (queue->srcresult != GST_FLOW_OK) {
2561           /* Errors in sticky event pushing are no problem and ignored here
2562            * as they will cause more meaningful errors during data flow.
2563            * For EOS events, that are not followed by data flow, we still
2564            * return FALSE here though and report an error.
2565            */
2566           if (!GST_EVENT_IS_STICKY (event)) {
2567             goto out_flow_error;
2568           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2569             if (queue->srcresult == GST_FLOW_NOT_LINKED
2570                 || queue->srcresult < GST_FLOW_EOS) {
2571               GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
2572             }
2573             goto out_flow_error;
2574           }
2575         }
2576         /* refuse more events on EOS */
2577         if (queue->is_eos)
2578           goto out_eos;
2579         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2580         GST_QUEUE2_MUTEX_UNLOCK (queue);
2581         gst_queue2_post_buffering (queue);
2582       } else {
2583         /* non-serialized events are passed downstream. */
2584         ret = gst_pad_push_event (queue->srcpad, event);
2585       }
2586       break;
2587   }
2588   if (ret == FALSE)
2589     return GST_FLOW_ERROR;
2590   return GST_FLOW_OK;
2591
2592   /* ERRORS */
2593 out_flushing:
2594   {
2595     GstFlowReturn ret = queue->sinkresult;
2596     GST_DEBUG_OBJECT (queue, "refusing event, we are %s",
2597         gst_flow_get_name (ret));
2598     GST_QUEUE2_MUTEX_UNLOCK (queue);
2599     gst_event_unref (event);
2600     return ret;
2601   }
2602 out_eos:
2603   {
2604     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2605     GST_QUEUE2_MUTEX_UNLOCK (queue);
2606     gst_event_unref (event);
2607     return GST_FLOW_EOS;
2608   }
2609 out_flow_error:
2610   {
2611     GST_LOG_OBJECT (queue,
2612         "refusing event, we have a downstream flow error: %s",
2613         gst_flow_get_name (queue->srcresult));
2614     GST_QUEUE2_MUTEX_UNLOCK (queue);
2615     gst_event_unref (event);
2616     return queue->srcresult;
2617   }
2618 }
2619
2620 static gboolean
2621 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2622     GstQuery * query)
2623 {
2624   GstQueue2 *queue;
2625   gboolean res;
2626
2627   queue = GST_QUEUE2 (parent);
2628
2629   switch (GST_QUERY_TYPE (query)) {
2630     default:
2631       if (GST_QUERY_IS_SERIALIZED (query)) {
2632         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2633         /* serialized events go in the queue. We need to be certain that we
2634          * don't cause deadlocks waiting for the query return value. We check if
2635          * the queue is empty (nothing is blocking downstream and the query can
2636          * be pushed for sure) or we are not buffering. If we are buffering,
2637          * the pipeline waits to unblock downstream until our queue fills up
2638          * completely, which can not happen if we block on the query..
2639          * Therefore we only potentially block when we are not buffering. */
2640         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2641         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2642                 || !queue->use_buffering)) {
2643           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2644             gst_queue2_locked_enqueue (queue, query,
2645                 GST_QUEUE2_ITEM_TYPE_QUERY);
2646
2647             STATUS (queue, queue->sinkpad, "wait for QUERY");
2648             while (queue->sinkresult == GST_FLOW_OK &&
2649                 queue->last_handled_query != query)
2650               g_cond_wait (&queue->query_handled, &queue->qlock);
2651             queue->last_handled_query = NULL;
2652             if (queue->sinkresult != GST_FLOW_OK)
2653               goto out_flushing;
2654             res = queue->last_query;
2655           } else {
2656             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2657             res = FALSE;
2658           }
2659         } else {
2660           GST_DEBUG_OBJECT (queue,
2661               "refusing query, we are not using the queue");
2662           res = FALSE;
2663         }
2664         GST_QUEUE2_MUTEX_UNLOCK (queue);
2665         gst_queue2_post_buffering (queue);
2666       } else {
2667         res = gst_pad_query_default (pad, parent, query);
2668       }
2669       break;
2670   }
2671   return res;
2672
2673   /* ERRORS */
2674 out_flushing:
2675   {
2676     GST_DEBUG_OBJECT (queue, "refusing query, we are %s",
2677         gst_flow_get_name (queue->sinkresult));
2678     GST_QUEUE2_MUTEX_UNLOCK (queue);
2679     return FALSE;
2680   }
2681 }
2682
2683 static gboolean
2684 gst_queue2_is_empty (GstQueue2 * queue)
2685 {
2686   /* never empty on EOS */
2687   if (queue->is_eos)
2688     return FALSE;
2689
2690   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2691     return queue->current->writing_pos <= queue->current->max_reading_pos;
2692   } else {
2693     if (queue->queue.length == 0)
2694       return TRUE;
2695   }
2696
2697   return FALSE;
2698 }
2699
2700 static gboolean
2701 gst_queue2_is_filled (GstQueue2 * queue)
2702 {
2703   gboolean res;
2704
2705   /* always filled on EOS */
2706   if (queue->is_eos)
2707     return TRUE;
2708
2709 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2710     (queue->cur_level.format) >= ((alt_max) ? \
2711       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2712
2713   /* if using a ring buffer we're filled if all ring buffer space is used
2714    * _by the current range_ */
2715   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2716     guint64 rb_size = queue->ring_buffer_max_size;
2717     GST_DEBUG_OBJECT (queue,
2718         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2719         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2720     return CHECK_FILLED (bytes, rb_size);
2721   }
2722
2723   /* if using file, we're never filled if we don't have EOS */
2724   if (QUEUE_IS_USING_TEMP_FILE (queue))
2725     return FALSE;
2726
2727   /* we are never filled when we have no buffers at all */
2728   if (queue->cur_level.buffers == 0)
2729     return FALSE;
2730
2731   /* we are filled if one of the current levels exceeds the max */
2732   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2733       || CHECK_FILLED (time, 0);
2734
2735   /* if we need to, use the rate estimate to check against the max time we are
2736    * allowed to queue */
2737   if (queue->use_rate_estimate)
2738     res |= CHECK_FILLED (rate_time, 0);
2739
2740 #undef CHECK_FILLED
2741   return res;
2742 }
2743
2744 static GstFlowReturn
2745 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2746     GstMiniObject * item, GstQueue2ItemType item_type)
2747 {
2748   /* we have to lock the queue since we span threads */
2749   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2750   /* when we received EOS, we refuse more data */
2751   if (queue->is_eos)
2752     goto out_eos;
2753   /* when we received unexpected from downstream, refuse more buffers */
2754   if (queue->unexpected)
2755     goto out_unexpected;
2756
2757   /* while we didn't receive the newsegment, we're seeking and we skip data */
2758   if (queue->seeking)
2759     goto out_seeking;
2760
2761   if (!gst_queue2_wait_free_space (queue))
2762     goto out_flushing;
2763
2764   /* put buffer in queue now */
2765   gst_queue2_locked_enqueue (queue, item, item_type);
2766   GST_QUEUE2_MUTEX_UNLOCK (queue);
2767   gst_queue2_post_buffering (queue);
2768
2769   return GST_FLOW_OK;
2770
2771   /* special conditions */
2772 out_flushing:
2773   {
2774     GstFlowReturn ret = queue->sinkresult;
2775
2776     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2777         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2778     GST_QUEUE2_MUTEX_UNLOCK (queue);
2779     gst_mini_object_unref (item);
2780
2781     return ret;
2782   }
2783 out_eos:
2784   {
2785     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2786     GST_QUEUE2_MUTEX_UNLOCK (queue);
2787     gst_mini_object_unref (item);
2788
2789     return GST_FLOW_EOS;
2790   }
2791 out_seeking:
2792   {
2793     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2794     GST_QUEUE2_MUTEX_UNLOCK (queue);
2795     gst_mini_object_unref (item);
2796
2797     return GST_FLOW_OK;
2798   }
2799 out_unexpected:
2800   {
2801     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2802     GST_QUEUE2_MUTEX_UNLOCK (queue);
2803     gst_mini_object_unref (item);
2804
2805     return GST_FLOW_EOS;
2806   }
2807 }
2808
2809 static GstFlowReturn
2810 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2811 {
2812   GstQueue2 *queue;
2813
2814   queue = GST_QUEUE2 (parent);
2815
2816   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2817       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2818       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2819       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2820       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2821
2822   return gst_queue2_chain_buffer_or_buffer_list (queue,
2823       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2824 }
2825
2826 static GstFlowReturn
2827 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2828     GstBufferList * buffer_list)
2829 {
2830   GstQueue2 *queue;
2831
2832   queue = GST_QUEUE2 (parent);
2833
2834   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2835       "received buffer list %p", buffer_list);
2836
2837   return gst_queue2_chain_buffer_or_buffer_list (queue,
2838       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2839 }
2840
2841 static GstMiniObject *
2842 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2843 {
2844   GstMiniObject *data;
2845
2846   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2847
2848   /* stop pushing buffers, we dequeue all items until we see an item that we
2849    * can push again, which is EOS or SEGMENT. If there is nothing in the
2850    * queue we can push, we set a flag to make the sinkpad refuse more
2851    * buffers with an EOS return value until we receive something
2852    * pushable again or we get flushed. */
2853   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2854     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2855       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2856           "dropping EOS buffer %p", data);
2857       gst_buffer_unref (GST_BUFFER_CAST (data));
2858     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2859       GstEvent *event = GST_EVENT_CAST (data);
2860       GstEventType type = GST_EVENT_TYPE (event);
2861
2862       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2863         /* we found a pushable item in the queue, push it out */
2864         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2865             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2866         return data;
2867       }
2868       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2869           "dropping EOS event %p", event);
2870       gst_event_unref (event);
2871     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2872       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2873           "dropping EOS buffer list %p", data);
2874       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2875     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2876       queue->last_query = FALSE;
2877       g_cond_signal (&queue->query_handled);
2878       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2879     }
2880   }
2881   /* no more items in the queue. Set the unexpected flag so that upstream
2882    * make us refuse any more buffers on the sinkpad. Since we will still
2883    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2884    * task function does not shut down. */
2885   queue->unexpected = TRUE;
2886   return NULL;
2887 }
2888
2889 /* dequeue an item from the queue an push it downstream. This functions returns
2890  * the result of the push. */
2891 static GstFlowReturn
2892 gst_queue2_push_one (GstQueue2 * queue)
2893 {
2894   GstFlowReturn result = queue->srcresult;
2895   GstMiniObject *data;
2896   GstQueue2ItemType item_type;
2897
2898   data = gst_queue2_locked_dequeue (queue, &item_type);
2899   if (data == NULL)
2900     goto no_item;
2901
2902 next:
2903   STATUS (queue, queue->srcpad, "We have something dequeud");
2904   g_atomic_int_set (&queue->downstream_may_block,
2905       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2906       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2907   GST_QUEUE2_MUTEX_UNLOCK (queue);
2908   gst_queue2_post_buffering (queue);
2909
2910   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2911     GstBuffer *buffer;
2912
2913     buffer = GST_BUFFER_CAST (data);
2914
2915     result = gst_pad_push (queue->srcpad, buffer);
2916     g_atomic_int_set (&queue->downstream_may_block, 0);
2917
2918     /* need to check for srcresult here as well */
2919     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2920     if (result == GST_FLOW_EOS) {
2921       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2922       if (data != NULL)
2923         goto next;
2924       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2925        * to the caller so that the task function does not shut down */
2926       result = GST_FLOW_OK;
2927     }
2928   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2929     GstEvent *event = GST_EVENT_CAST (data);
2930     GstEventType type = GST_EVENT_TYPE (event);
2931
2932     if (type == GST_EVENT_TAG) {
2933       if (queue->use_tags_bitrate) {
2934         GstTagList *tags;
2935         guint bitrate;
2936
2937         gst_event_parse_tag (event, &tags);
2938         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2939             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2940           GST_QUEUE2_MUTEX_LOCK (queue);
2941           queue->src_tags_bitrate = bitrate;
2942           GST_QUEUE2_MUTEX_UNLOCK (queue);
2943           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
2944         }
2945       }
2946     }
2947
2948     gst_pad_push_event (queue->srcpad, event);
2949
2950     /* if we're EOS, return EOS so that the task pauses. */
2951     if (type == GST_EVENT_EOS) {
2952       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2953           "pushed EOS event %p, return EOS", event);
2954       result = GST_FLOW_EOS;
2955     }
2956
2957     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2958   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2959     GstBufferList *buffer_list;
2960
2961     buffer_list = GST_BUFFER_LIST_CAST (data);
2962
2963     result = gst_pad_push_list (queue->srcpad, buffer_list);
2964     g_atomic_int_set (&queue->downstream_may_block, 0);
2965
2966     /* need to check for srcresult here as well */
2967     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2968     if (result == GST_FLOW_EOS) {
2969       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2970       if (data != NULL)
2971         goto next;
2972       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2973        * to the caller so that the task function does not shut down */
2974       result = GST_FLOW_OK;
2975     }
2976   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2977     GstQuery *query = GST_QUERY_CAST (data);
2978
2979     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2980     queue->last_handled_query = query;
2981     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2982     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2983     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2984         "did query %p, return %d", query, queue->last_query);
2985     g_cond_signal (&queue->query_handled);
2986     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2987     result = GST_FLOW_OK;
2988   }
2989   return result;
2990
2991   /* ERRORS */
2992 no_item:
2993   {
2994     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2995         "exit because we have no item in the queue");
2996     return GST_FLOW_ERROR;
2997   }
2998 out_flushing:
2999   {
3000     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are %s",
3001         gst_flow_get_name (queue->srcresult));
3002     return queue->srcresult;
3003   }
3004 }
3005
3006 /* called repeatedly with @pad as the source pad. This function should push out
3007  * data to the peer element. */
3008 static void
3009 gst_queue2_loop (GstPad * pad)
3010 {
3011   GstQueue2 *queue;
3012   GstFlowReturn ret;
3013
3014   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
3015
3016   /* have to lock for thread-safety */
3017   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3018
3019   if (gst_queue2_is_empty (queue)) {
3020     gboolean started;
3021
3022     /* pause the timer while we wait. The fact that we are waiting does not mean
3023      * the byterate on the output pad is lower */
3024     if ((started = queue->out_timer_started))
3025       g_timer_stop (queue->out_timer);
3026
3027     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
3028         "queue is empty, waiting for new data");
3029     do {
3030       /* Wait for data to be available, we could be unlocked because of a flush. */
3031       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
3032     }
3033     while (gst_queue2_is_empty (queue));
3034
3035     /* and continue if we were running before */
3036     if (started)
3037       g_timer_continue (queue->out_timer);
3038   }
3039   ret = gst_queue2_push_one (queue);
3040   queue->srcresult = ret;
3041   queue->sinkresult = ret;
3042   if (ret != GST_FLOW_OK)
3043     goto out_flushing;
3044
3045   GST_QUEUE2_MUTEX_UNLOCK (queue);
3046   gst_queue2_post_buffering (queue);
3047
3048   return;
3049
3050   /* ERRORS */
3051 out_flushing:
3052   {
3053     gboolean eos = queue->is_eos;
3054     GstFlowReturn ret = queue->srcresult;
3055
3056     gst_pad_pause_task (queue->srcpad);
3057     if (ret == GST_FLOW_FLUSHING) {
3058       gst_queue2_locked_flush (queue, FALSE, FALSE);
3059     } else {
3060       GST_QUEUE2_SIGNAL_DEL (queue);
3061       queue->last_query = FALSE;
3062       g_cond_signal (&queue->query_handled);
3063     }
3064     GST_QUEUE2_MUTEX_UNLOCK (queue);
3065     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3066         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
3067     /* Recalculate buffering levels before stopping since the source flow
3068      * might cause a different buffering level (like NOT_LINKED making
3069      * the queue appear as full) */
3070     if (queue->use_buffering)
3071       update_buffering (queue);
3072     gst_queue2_post_buffering (queue);
3073     /* let app know about us giving up if upstream is not expected to do so */
3074     /* EOS is already taken care of elsewhere */
3075     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3076       GST_ELEMENT_FLOW_ERROR (queue, ret);
3077       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3078     }
3079     return;
3080   }
3081 }
3082
3083 static gboolean
3084 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3085 {
3086   gboolean res = TRUE;
3087   GstQueue2 *queue = GST_QUEUE2 (parent);
3088
3089 #ifndef GST_DISABLE_GST_DEBUG
3090   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3091       event, GST_EVENT_TYPE_NAME (event));
3092 #endif
3093
3094   switch (GST_EVENT_TYPE (event)) {
3095     case GST_EVENT_FLUSH_START:
3096       if (QUEUE_IS_USING_QUEUE (queue)) {
3097         /* just forward upstream */
3098         res = gst_pad_push_event (queue->sinkpad, event);
3099       } else {
3100         /* now unblock the getrange function */
3101         GST_QUEUE2_MUTEX_LOCK (queue);
3102         GST_DEBUG_OBJECT (queue, "flushing");
3103         queue->srcresult = GST_FLOW_FLUSHING;
3104         GST_QUEUE2_SIGNAL_ADD (queue);
3105         GST_QUEUE2_MUTEX_UNLOCK (queue);
3106
3107         /* when using a temp file, we eat the event */
3108         res = TRUE;
3109         gst_event_unref (event);
3110       }
3111       break;
3112     case GST_EVENT_FLUSH_STOP:
3113       if (QUEUE_IS_USING_QUEUE (queue)) {
3114         /* just forward upstream */
3115         res = gst_pad_push_event (queue->sinkpad, event);
3116       } else {
3117         /* now unblock the getrange function */
3118         GST_QUEUE2_MUTEX_LOCK (queue);
3119         queue->srcresult = GST_FLOW_OK;
3120         GST_QUEUE2_MUTEX_UNLOCK (queue);
3121
3122         /* when using a temp file, we eat the event */
3123         res = TRUE;
3124         gst_event_unref (event);
3125       }
3126       break;
3127     case GST_EVENT_RECONFIGURE:
3128       GST_QUEUE2_MUTEX_LOCK (queue);
3129       /* assume downstream is linked now and try to push again */
3130       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3131         queue->srcresult = GST_FLOW_OK;
3132         queue->sinkresult = GST_FLOW_OK;
3133         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3134           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3135               NULL);
3136         }
3137       }
3138       GST_QUEUE2_MUTEX_UNLOCK (queue);
3139
3140       res = gst_pad_push_event (queue->sinkpad, event);
3141       break;
3142     default:
3143       res = gst_pad_push_event (queue->sinkpad, event);
3144       break;
3145   }
3146
3147   return res;
3148 }
3149
3150 static gboolean
3151 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3152 {
3153   GstQueue2 *queue;
3154
3155   queue = GST_QUEUE2 (parent);
3156
3157   switch (GST_QUERY_TYPE (query)) {
3158     case GST_QUERY_POSITION:
3159     {
3160       gint64 peer_pos;
3161       GstFormat format;
3162
3163       if (!gst_pad_peer_query (queue->sinkpad, query))
3164         goto peer_failed;
3165
3166       /* get peer position */
3167       gst_query_parse_position (query, &format, &peer_pos);
3168
3169       /* FIXME: this code assumes that there's no discont in the queue */
3170       switch (format) {
3171         case GST_FORMAT_BYTES:
3172           peer_pos -= queue->cur_level.bytes;
3173           if (peer_pos < 0)     /* Clamp result to 0 */
3174             peer_pos = 0;
3175           break;
3176         case GST_FORMAT_TIME:
3177           peer_pos -= queue->cur_level.time;
3178           if (peer_pos < 0)     /* Clamp result to 0 */
3179             peer_pos = 0;
3180           break;
3181         default:
3182           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3183               "know how to adjust value", gst_format_get_name (format));
3184           return FALSE;
3185       }
3186       /* set updated position */
3187       gst_query_set_position (query, format, peer_pos);
3188       break;
3189     }
3190     case GST_QUERY_DURATION:
3191     {
3192       GST_DEBUG_OBJECT (queue, "doing peer query");
3193
3194       if (!gst_pad_peer_query (queue->sinkpad, query))
3195         goto peer_failed;
3196
3197       GST_DEBUG_OBJECT (queue, "peer query success");
3198       break;
3199     }
3200     case GST_QUERY_BUFFERING:
3201     {
3202       gint percent;
3203       gboolean is_buffering;
3204       GstBufferingMode mode;
3205       gint avg_in, avg_out;
3206       gint64 buffering_left;
3207
3208       GST_DEBUG_OBJECT (queue, "query buffering");
3209
3210       get_buffering_level (queue, &is_buffering, &percent);
3211       percent = convert_to_buffering_percent (queue, percent);
3212       gst_query_set_buffering_percent (query, is_buffering, percent);
3213
3214       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3215           &buffering_left);
3216       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3217           buffering_left);
3218
3219       if (!QUEUE_IS_USING_QUEUE (queue)) {
3220         /* add ranges for download and ringbuffer buffering */
3221         GstFormat format;
3222         gint64 start, stop, range_start, range_stop;
3223         guint64 writing_pos;
3224         gint64 estimated_total;
3225         gint64 duration;
3226         gboolean peer_res, is_eos;
3227         GstQueue2Range *queued_ranges;
3228
3229         /* we need a current download region */
3230         if (queue->current == NULL)
3231           return FALSE;
3232
3233         writing_pos = queue->current->writing_pos;
3234         is_eos = queue->is_eos;
3235
3236         if (is_eos) {
3237           /* we're EOS, we know the duration in bytes now */
3238           peer_res = TRUE;
3239           duration = writing_pos;
3240         } else {
3241           /* get duration of upstream in bytes */
3242           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3243               GST_FORMAT_BYTES, &duration);
3244         }
3245
3246         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3247             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3248
3249         /* calculate remaining and total download time */
3250         if (peer_res && avg_in > 0.0)
3251           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3252         else
3253           estimated_total = -1;
3254
3255         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3256             estimated_total);
3257
3258         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3259
3260         switch (format) {
3261           case GST_FORMAT_PERCENT:
3262             /* we need duration */
3263             if (!peer_res)
3264               goto peer_failed;
3265
3266             start = 0;
3267             /* get our available data relative to the duration */
3268             if (duration != -1)
3269               stop =
3270                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3271                   duration);
3272             else
3273               stop = -1;
3274             break;
3275           case GST_FORMAT_BYTES:
3276             start = 0;
3277             stop = writing_pos;
3278             break;
3279           default:
3280             start = -1;
3281             stop = -1;
3282             break;
3283         }
3284
3285         /* fill out the buffered ranges */
3286         for (queued_ranges = queue->ranges; queued_ranges;
3287             queued_ranges = queued_ranges->next) {
3288           switch (format) {
3289             case GST_FORMAT_PERCENT:
3290               if (duration == -1) {
3291                 range_start = 0;
3292                 range_stop = 0;
3293                 break;
3294               }
3295               range_start =
3296                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3297                   queued_ranges->offset, duration);
3298               range_stop =
3299                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3300                   queued_ranges->writing_pos, duration);
3301               break;
3302             case GST_FORMAT_BYTES:
3303               range_start = queued_ranges->offset;
3304               range_stop = queued_ranges->writing_pos;
3305               break;
3306             default:
3307               range_start = -1;
3308               range_stop = -1;
3309               break;
3310           }
3311           if (range_start == range_stop)
3312             continue;
3313           GST_DEBUG_OBJECT (queue,
3314               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3315               G_GINT64_FORMAT, range_start, range_stop);
3316           gst_query_add_buffering_range (query, range_start, range_stop);
3317         }
3318
3319         gst_query_set_buffering_range (query, format, start, stop,
3320             estimated_total);
3321       }
3322       break;
3323     }
3324     case GST_QUERY_SCHEDULING:
3325     {
3326       gboolean pull_mode;
3327       GstSchedulingFlags flags = 0;
3328
3329       if (!gst_pad_peer_query (queue->sinkpad, query))
3330         goto peer_failed;
3331
3332       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3333
3334       /* we can operate in pull mode when we are using a tempfile */
3335       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3336
3337       if (pull_mode)
3338         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3339       gst_query_set_scheduling (query, flags, 0, -1, 0);
3340       if (pull_mode)
3341         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3342       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3343       break;
3344     }
3345     default:
3346       /* peer handled other queries */
3347       if (!gst_pad_query_default (pad, parent, query))
3348         goto peer_failed;
3349       break;
3350   }
3351
3352   return TRUE;
3353
3354   /* ERRORS */
3355 peer_failed:
3356   {
3357     GST_DEBUG_OBJECT (queue, "failed peer query");
3358     return FALSE;
3359   }
3360 }
3361
3362 static gboolean
3363 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3364 {
3365   GstQueue2 *queue = GST_QUEUE2 (element);
3366
3367   /* simply forward to the srcpad query function */
3368   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3369       query);
3370 }
3371
3372 static void
3373 gst_queue2_update_upstream_size (GstQueue2 * queue)
3374 {
3375   gint64 upstream_size = -1;
3376
3377   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3378           &upstream_size)) {
3379     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3380
3381     /* upstream_size can be negative but queue->upstream_size is unsigned.
3382      * Prevent setting negative values to it (the query can return -1) */
3383     if (upstream_size >= 0)
3384       queue->upstream_size = upstream_size;
3385     else
3386       queue->upstream_size = 0;
3387   }
3388 }
3389
3390 static GstFlowReturn
3391 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3392     guint length, GstBuffer ** buffer)
3393 {
3394   GstQueue2 *queue;
3395   GstFlowReturn ret;
3396
3397   queue = GST_QUEUE2_CAST (parent);
3398
3399   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3400   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3401   offset = (offset == -1) ? queue->current->reading_pos : offset;
3402
3403   GST_DEBUG_OBJECT (queue,
3404       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3405
3406   /* catch any reads beyond the size of the file here to make sure queue2
3407    * doesn't send seek events beyond the size of the file upstream, since
3408    * that would confuse elements such as souphttpsrc and/or http servers.
3409    * Demuxers often just loop until EOS at the end of the file to figure out
3410    * when they've read all the end-headers or index chunks. */
3411   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3412     gst_queue2_update_upstream_size (queue);
3413     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3414       goto out_unexpected;
3415   }
3416
3417   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3418     gst_queue2_update_upstream_size (queue);
3419     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3420       length = queue->upstream_size - offset;
3421       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3422     }
3423   }
3424
3425   /* FIXME - function will block when the range is not yet available */
3426   ret = gst_queue2_create_read (queue, offset, length, buffer);
3427   GST_QUEUE2_MUTEX_UNLOCK (queue);
3428   gst_queue2_post_buffering (queue);
3429
3430   return ret;
3431
3432   /* ERRORS */
3433 out_flushing:
3434   {
3435     ret = queue->srcresult;
3436
3437     GST_DEBUG_OBJECT (queue, "we are %s", gst_flow_get_name (ret));
3438     GST_QUEUE2_MUTEX_UNLOCK (queue);
3439     return ret;
3440   }
3441 out_unexpected:
3442   {
3443     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3444     GST_QUEUE2_MUTEX_UNLOCK (queue);
3445     return GST_FLOW_EOS;
3446   }
3447 }
3448
3449 /* sink currently only operates in push mode */
3450 static gboolean
3451 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3452     GstPadMode mode, gboolean active)
3453 {
3454   gboolean result;
3455   GstQueue2 *queue;
3456
3457   queue = GST_QUEUE2 (parent);
3458
3459   switch (mode) {
3460     case GST_PAD_MODE_PUSH:
3461       if (active) {
3462         GST_QUEUE2_MUTEX_LOCK (queue);
3463         GST_DEBUG_OBJECT (queue, "activating push mode");
3464         queue->srcresult = GST_FLOW_OK;
3465         queue->sinkresult = GST_FLOW_OK;
3466         queue->is_eos = FALSE;
3467         queue->unexpected = FALSE;
3468         reset_rate_timer (queue);
3469         GST_QUEUE2_MUTEX_UNLOCK (queue);
3470       } else {
3471         /* unblock chain function */
3472         GST_QUEUE2_MUTEX_LOCK (queue);
3473         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3474         queue->srcresult = GST_FLOW_FLUSHING;
3475         queue->sinkresult = GST_FLOW_FLUSHING;
3476         GST_QUEUE2_SIGNAL_DEL (queue);
3477         GST_QUEUE2_MUTEX_UNLOCK (queue);
3478
3479         /* wait until it is unblocked and clean up */
3480         GST_PAD_STREAM_LOCK (pad);
3481         GST_QUEUE2_MUTEX_LOCK (queue);
3482         gst_queue2_locked_flush (queue, TRUE, FALSE);
3483         GST_QUEUE2_MUTEX_UNLOCK (queue);
3484         GST_PAD_STREAM_UNLOCK (pad);
3485       }
3486       result = TRUE;
3487       break;
3488     default:
3489       result = FALSE;
3490       break;
3491   }
3492   return result;
3493 }
3494
3495 /* src operating in push mode, we start a task on the source pad that pushes out
3496  * buffers from the queue */
3497 static gboolean
3498 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3499 {
3500   gboolean result = FALSE;
3501   GstQueue2 *queue;
3502
3503   queue = GST_QUEUE2 (parent);
3504
3505   if (active) {
3506     GST_QUEUE2_MUTEX_LOCK (queue);
3507     GST_DEBUG_OBJECT (queue, "activating push mode");
3508     queue->srcresult = GST_FLOW_OK;
3509     queue->sinkresult = GST_FLOW_OK;
3510     queue->is_eos = FALSE;
3511     queue->unexpected = FALSE;
3512     result =
3513         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3514     GST_QUEUE2_MUTEX_UNLOCK (queue);
3515   } else {
3516     /* unblock loop function */
3517     GST_QUEUE2_MUTEX_LOCK (queue);
3518     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3519     queue->srcresult = GST_FLOW_FLUSHING;
3520     queue->sinkresult = GST_FLOW_FLUSHING;
3521     /* the item add signal will unblock */
3522     GST_QUEUE2_SIGNAL_ADD (queue);
3523     GST_QUEUE2_MUTEX_UNLOCK (queue);
3524
3525     /* step 2, make sure streaming finishes */
3526     result = gst_pad_stop_task (pad);
3527
3528     GST_QUEUE2_MUTEX_LOCK (queue);
3529     gst_queue2_locked_flush (queue, FALSE, FALSE);
3530     GST_QUEUE2_MUTEX_UNLOCK (queue);
3531   }
3532
3533   return result;
3534 }
3535
3536 /* pull mode, downstream will call our getrange function */
3537 static gboolean
3538 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3539 {
3540   gboolean result;
3541   GstQueue2 *queue;
3542
3543   queue = GST_QUEUE2 (parent);
3544
3545   if (active) {
3546     GST_QUEUE2_MUTEX_LOCK (queue);
3547     if (!QUEUE_IS_USING_QUEUE (queue)) {
3548       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3549         /* open the temp file now */
3550         result = gst_queue2_open_temp_location_file (queue);
3551       } else if (!queue->ring_buffer) {
3552         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3553         result = ! !queue->ring_buffer;
3554       } else {
3555         result = TRUE;
3556       }
3557
3558       GST_DEBUG_OBJECT (queue, "activating pull mode");
3559       init_ranges (queue);
3560       queue->srcresult = GST_FLOW_OK;
3561       queue->sinkresult = GST_FLOW_OK;
3562       queue->is_eos = FALSE;
3563       queue->unexpected = FALSE;
3564       queue->upstream_size = 0;
3565     } else {
3566       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3567       /* this is not allowed, we cannot operate in pull mode without a temp
3568        * file. */
3569       queue->srcresult = GST_FLOW_FLUSHING;
3570       queue->sinkresult = GST_FLOW_FLUSHING;
3571       result = FALSE;
3572     }
3573     GST_QUEUE2_MUTEX_UNLOCK (queue);
3574   } else {
3575     GST_QUEUE2_MUTEX_LOCK (queue);
3576     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3577     queue->srcresult = GST_FLOW_FLUSHING;
3578     queue->sinkresult = GST_FLOW_FLUSHING;
3579     /* this will unlock getrange */
3580     GST_QUEUE2_SIGNAL_ADD (queue);
3581     result = TRUE;
3582     GST_QUEUE2_MUTEX_UNLOCK (queue);
3583   }
3584
3585   return result;
3586 }
3587
3588 static gboolean
3589 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3590     gboolean active)
3591 {
3592   gboolean res;
3593
3594   switch (mode) {
3595     case GST_PAD_MODE_PULL:
3596       res = gst_queue2_src_activate_pull (pad, parent, active);
3597       break;
3598     case GST_PAD_MODE_PUSH:
3599       res = gst_queue2_src_activate_push (pad, parent, active);
3600       break;
3601     default:
3602       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3603       res = FALSE;
3604       break;
3605   }
3606   return res;
3607 }
3608
3609 static GstStateChangeReturn
3610 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3611 {
3612   GstQueue2 *queue;
3613   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3614
3615   queue = GST_QUEUE2 (element);
3616
3617   switch (transition) {
3618     case GST_STATE_CHANGE_NULL_TO_READY:
3619       break;
3620     case GST_STATE_CHANGE_READY_TO_PAUSED:
3621       GST_QUEUE2_MUTEX_LOCK (queue);
3622       if (!QUEUE_IS_USING_QUEUE (queue)) {
3623         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3624           if (!gst_queue2_open_temp_location_file (queue))
3625             ret = GST_STATE_CHANGE_FAILURE;
3626         } else {
3627           if (queue->ring_buffer) {
3628             g_free (queue->ring_buffer);
3629             queue->ring_buffer = NULL;
3630           }
3631           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3632             ret = GST_STATE_CHANGE_FAILURE;
3633         }
3634         init_ranges (queue);
3635       }
3636       queue->segment_event_received = FALSE;
3637       queue->starting_segment = NULL;
3638       gst_event_replace (&queue->stream_start_event, NULL);
3639       GST_QUEUE2_MUTEX_UNLOCK (queue);
3640       break;
3641     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3642       break;
3643     default:
3644       break;
3645   }
3646
3647   if (ret == GST_STATE_CHANGE_FAILURE)
3648     return ret;
3649
3650   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3651
3652   if (ret == GST_STATE_CHANGE_FAILURE)
3653     return ret;
3654
3655   switch (transition) {
3656     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3657       break;
3658     case GST_STATE_CHANGE_PAUSED_TO_READY:
3659       GST_QUEUE2_MUTEX_LOCK (queue);
3660       if (!QUEUE_IS_USING_QUEUE (queue)) {
3661         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3662           gst_queue2_close_temp_location_file (queue);
3663         } else if (queue->ring_buffer) {
3664           g_free (queue->ring_buffer);
3665           queue->ring_buffer = NULL;
3666         }
3667         clean_ranges (queue);
3668       }
3669       if (queue->starting_segment != NULL) {
3670         gst_event_unref (queue->starting_segment);
3671         queue->starting_segment = NULL;
3672       }
3673       gst_event_replace (&queue->stream_start_event, NULL);
3674       GST_QUEUE2_MUTEX_UNLOCK (queue);
3675       break;
3676     case GST_STATE_CHANGE_READY_TO_NULL:
3677       break;
3678     default:
3679       break;
3680   }
3681
3682   return ret;
3683 }
3684
3685 /* changing the capacity of the queue must wake up
3686  * the _chain function, it might have more room now
3687  * to store the buffer/event in the queue */
3688 #define QUEUE_CAPACITY_CHANGE(q) \
3689   GST_QUEUE2_SIGNAL_DEL (queue); \
3690   if (queue->use_buffering)      \
3691     update_buffering (queue);
3692
3693 /* Changing the minimum required fill level must
3694  * wake up the _loop function as it might now
3695  * be able to preceed.
3696  */
3697 #define QUEUE_THRESHOLD_CHANGE(q)\
3698   GST_QUEUE2_SIGNAL_ADD (queue);
3699
3700 static void
3701 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3702 {
3703   GstState state;
3704
3705   /* the element must be stopped in order to do this */
3706   GST_OBJECT_LOCK (queue);
3707   state = GST_STATE (queue);
3708   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3709     goto wrong_state;
3710   GST_OBJECT_UNLOCK (queue);
3711
3712   /* set new location */
3713   g_free (queue->temp_template);
3714   queue->temp_template = g_strdup (template);
3715
3716   return;
3717
3718 /* ERROR */
3719 wrong_state:
3720   {
3721     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3722     GST_OBJECT_UNLOCK (queue);
3723   }
3724 }
3725
3726 static void
3727 gst_queue2_set_property (GObject * object,
3728     guint prop_id, const GValue * value, GParamSpec * pspec)
3729 {
3730   GstQueue2 *queue = GST_QUEUE2 (object);
3731
3732   /* someone could change levels here, and since this
3733    * affects the get/put funcs, we need to lock for safety. */
3734   GST_QUEUE2_MUTEX_LOCK (queue);
3735
3736   switch (prop_id) {
3737     case PROP_MAX_SIZE_BYTES:
3738       queue->max_level.bytes = g_value_get_uint (value);
3739       QUEUE_CAPACITY_CHANGE (queue);
3740       break;
3741     case PROP_MAX_SIZE_BUFFERS:
3742       queue->max_level.buffers = g_value_get_uint (value);
3743       QUEUE_CAPACITY_CHANGE (queue);
3744       break;
3745     case PROP_MAX_SIZE_TIME:
3746       queue->max_level.time = g_value_get_uint64 (value);
3747       /* set rate_time to the same value. We use an extra field in the level
3748        * structure so that we can easily access and compare it */
3749       queue->max_level.rate_time = queue->max_level.time;
3750       QUEUE_CAPACITY_CHANGE (queue);
3751       break;
3752     case PROP_USE_BUFFERING:
3753       queue->use_buffering = g_value_get_boolean (value);
3754       if (!queue->use_buffering && queue->is_buffering) {
3755         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3756             "posting 100%% message");
3757         SET_PERCENT (queue, 100);
3758         queue->is_buffering = FALSE;
3759       }
3760
3761       if (queue->use_buffering) {
3762         queue->is_buffering = TRUE;
3763         update_buffering (queue);
3764       }
3765       break;
3766     case PROP_USE_TAGS_BITRATE:
3767       queue->use_tags_bitrate = g_value_get_boolean (value);
3768       break;
3769     case PROP_USE_RATE_ESTIMATE:
3770       queue->use_rate_estimate = g_value_get_boolean (value);
3771       break;
3772     case PROP_LOW_PERCENT:
3773       queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3774       if (queue->is_buffering)
3775         update_buffering (queue);
3776       break;
3777     case PROP_HIGH_PERCENT:
3778       queue->high_watermark =
3779           g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3780       if (queue->is_buffering)
3781         update_buffering (queue);
3782       break;
3783     case PROP_LOW_WATERMARK:
3784       queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3785       if (queue->is_buffering)
3786         update_buffering (queue);
3787       break;
3788     case PROP_HIGH_WATERMARK:
3789       queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3790       if (queue->is_buffering)
3791         update_buffering (queue);
3792       break;
3793     case PROP_TEMP_TEMPLATE:
3794       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3795       break;
3796     case PROP_TEMP_REMOVE:
3797       queue->temp_remove = g_value_get_boolean (value);
3798       break;
3799     case PROP_RING_BUFFER_MAX_SIZE:
3800       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3801       break;
3802     default:
3803       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3804       break;
3805   }
3806
3807   GST_QUEUE2_MUTEX_UNLOCK (queue);
3808   gst_queue2_post_buffering (queue);
3809 }
3810
3811 static void
3812 gst_queue2_get_property (GObject * object,
3813     guint prop_id, GValue * value, GParamSpec * pspec)
3814 {
3815   GstQueue2 *queue = GST_QUEUE2 (object);
3816
3817   GST_QUEUE2_MUTEX_LOCK (queue);
3818
3819   switch (prop_id) {
3820     case PROP_CUR_LEVEL_BYTES:
3821       g_value_set_uint (value, queue->cur_level.bytes);
3822       break;
3823     case PROP_CUR_LEVEL_BUFFERS:
3824       g_value_set_uint (value, queue->cur_level.buffers);
3825       break;
3826     case PROP_CUR_LEVEL_TIME:
3827       g_value_set_uint64 (value, queue->cur_level.time);
3828       break;
3829     case PROP_MAX_SIZE_BYTES:
3830       g_value_set_uint (value, queue->max_level.bytes);
3831       break;
3832     case PROP_MAX_SIZE_BUFFERS:
3833       g_value_set_uint (value, queue->max_level.buffers);
3834       break;
3835     case PROP_MAX_SIZE_TIME:
3836       g_value_set_uint64 (value, queue->max_level.time);
3837       break;
3838     case PROP_USE_BUFFERING:
3839       g_value_set_boolean (value, queue->use_buffering);
3840       break;
3841     case PROP_USE_TAGS_BITRATE:
3842       g_value_set_boolean (value, queue->use_tags_bitrate);
3843       break;
3844     case PROP_USE_RATE_ESTIMATE:
3845       g_value_set_boolean (value, queue->use_rate_estimate);
3846       break;
3847     case PROP_LOW_PERCENT:
3848       g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
3849       break;
3850     case PROP_HIGH_PERCENT:
3851       g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
3852       break;
3853     case PROP_LOW_WATERMARK:
3854       g_value_set_double (value, queue->low_watermark /
3855           (gdouble) MAX_BUFFERING_LEVEL);
3856       break;
3857     case PROP_HIGH_WATERMARK:
3858       g_value_set_double (value, queue->high_watermark /
3859           (gdouble) MAX_BUFFERING_LEVEL);
3860       break;
3861     case PROP_TEMP_TEMPLATE:
3862       g_value_set_string (value, queue->temp_template);
3863       break;
3864     case PROP_TEMP_LOCATION:
3865       g_value_set_string (value, queue->temp_location);
3866       break;
3867     case PROP_TEMP_REMOVE:
3868       g_value_set_boolean (value, queue->temp_remove);
3869       break;
3870     case PROP_RING_BUFFER_MAX_SIZE:
3871       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3872       break;
3873     case PROP_AVG_IN_RATE:
3874     {
3875       gdouble in_rate = queue->byte_in_rate;
3876
3877       /* During the first RATE_INTERVAL, byte_in_rate will not have been
3878        * calculated, so calculate it here. */
3879       if (in_rate == 0.0 && queue->bytes_in
3880           && queue->last_update_in_rates_elapsed > 0.0)
3881         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3882
3883       g_value_set_int64 (value, (gint64) in_rate);
3884       break;
3885     }
3886     default:
3887       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3888       break;
3889   }
3890
3891   GST_QUEUE2_MUTEX_UNLOCK (queue);
3892 }