apply tizen build option rule
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
77     GST_PAD_SINK,
78     GST_PAD_ALWAYS,
79     GST_STATIC_CAPS_ANY);
80
81 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
82     GST_PAD_SRC,
83     GST_PAD_ALWAYS,
84     GST_STATIC_CAPS_ANY);
85
86 GST_DEBUG_CATEGORY_STATIC (queue_debug);
87 #define GST_CAT_DEFAULT (queue_debug)
88 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
89
90 enum
91 {
92   LAST_SIGNAL
93 };
94
95 /* other defines */
96 #define DEFAULT_BUFFER_SIZE 4096
97 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
98 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
99 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
100
101 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
102
103 /* default property values */
104 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
105 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
106 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
107 #define DEFAULT_USE_BUFFERING      FALSE
108 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
109 #define DEFAULT_LOW_PERCENT        10
110 #define DEFAULT_HIGH_PERCENT       99
111 #define DEFAULT_TEMP_REMOVE        TRUE
112 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
113
114 enum
115 {
116   PROP_0,
117   PROP_CUR_LEVEL_BUFFERS,
118   PROP_CUR_LEVEL_BYTES,
119   PROP_CUR_LEVEL_TIME,
120   PROP_MAX_SIZE_BUFFERS,
121   PROP_MAX_SIZE_BYTES,
122   PROP_MAX_SIZE_TIME,
123   PROP_USE_BUFFERING,
124   PROP_USE_RATE_ESTIMATE,
125   PROP_LOW_PERCENT,
126   PROP_HIGH_PERCENT,
127   PROP_TEMP_TEMPLATE,
128   PROP_TEMP_LOCATION,
129   PROP_TEMP_REMOVE,
130   PROP_RING_BUFFER_MAX_SIZE,
131   PROP_LAST
132 };
133
134 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
135   l.buffers = 0;                                        \
136   l.bytes = 0;                                          \
137   l.time = 0;                                           \
138   l.rate_time = 0;                                      \
139 } G_STMT_END
140
141 #define STATUS(queue, pad, msg) \
142   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
143                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
144                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
145                       " ns, %"G_GUINT64_FORMAT" items", \
146                       GST_DEBUG_PAD_NAME (pad), \
147                       queue->cur_level.buffers, \
148                       queue->max_level.buffers, \
149                       queue->cur_level.bytes, \
150                       queue->max_level.bytes, \
151                       queue->cur_level.time, \
152                       queue->max_level.time, \
153                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
154                         queue->current->writing_pos - queue->current->max_reading_pos : \
155                         queue->queue.length))
156
157 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
158   g_mutex_lock (&q->qlock);                                              \
159 } G_STMT_END
160
161 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
162   GST_QUEUE2_MUTEX_LOCK (q);                                            \
163   if (res != GST_FLOW_OK)                                               \
164     goto label;                                                         \
165 } G_STMT_END
166
167 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
168   g_mutex_unlock (&q->qlock);                                            \
169 } G_STMT_END
170
171 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
172   STATUS (queue, q->sinkpad, "wait for DEL");                           \
173   q->waiting_del = TRUE;                                                \
174   g_cond_wait (&q->item_del, &queue->qlock);                              \
175   q->waiting_del = FALSE;                                               \
176   if (res != GST_FLOW_OK) {                                             \
177     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
178     goto label;                                                         \
179   }                                                                     \
180   STATUS (queue, q->sinkpad, "received DEL");                           \
181 } G_STMT_END
182
183 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
184   STATUS (queue, q->srcpad, "wait for ADD");                            \
185   q->waiting_add = TRUE;                                                \
186   g_cond_wait (&q->item_add, &q->qlock);                                  \
187   q->waiting_add = FALSE;                                               \
188   if (res != GST_FLOW_OK) {                                             \
189     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
190     goto label;                                                         \
191   }                                                                     \
192   STATUS (queue, q->srcpad, "received ADD");                            \
193 } G_STMT_END
194
195 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
196   if (q->waiting_del) {                                                 \
197     STATUS (q, q->srcpad, "signal DEL");                                \
198     g_cond_signal (&q->item_del);                                        \
199   }                                                                     \
200 } G_STMT_END
201
202 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
203   if (q->waiting_add) {                                                 \
204     STATUS (q, q->sinkpad, "signal ADD");                               \
205     g_cond_signal (&q->item_add);                                        \
206   }                                                                     \
207 } G_STMT_END
208
209 #define SET_PERCENT(q, perc) G_STMT_START {                              \
210   if (perc != q->buffering_percent) {                                    \
211     q->buffering_percent = perc;                                         \
212     q->percent_changed = TRUE;                                           \
213     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
214     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
215         &q->buffering_left);                                             \
216   }                                                                      \
217 } G_STMT_END
218
219 #define _do_init \
220     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
221     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
222         "dataflow inside the queue element");
223 #define gst_queue2_parent_class parent_class
224 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
225
226 static void gst_queue2_finalize (GObject * object);
227
228 static void gst_queue2_set_property (GObject * object,
229     guint prop_id, const GValue * value, GParamSpec * pspec);
230 static void gst_queue2_get_property (GObject * object,
231     guint prop_id, GValue * value, GParamSpec * pspec);
232
233 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
234     GstBuffer * buffer);
235 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
236     GstBufferList * buffer_list);
237 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
238 static void gst_queue2_loop (GstPad * pad);
239
240 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
241     GstEvent * event);
242 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
243     GstQuery * query);
244
245 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
246     GstEvent * event);
247 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
248     GstQuery * query);
249 static gboolean gst_queue2_handle_query (GstElement * element,
250     GstQuery * query);
251
252 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
253     guint64 offset, guint length, GstBuffer ** buffer);
254
255 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
256     GstPadMode mode, gboolean active);
257 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
258     GstPadMode mode, gboolean active);
259 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
260     GstStateChange transition);
261
262 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
263 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
264
265 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
266 static void update_in_rates (GstQueue2 * queue);
267 static void gst_queue2_post_buffering (GstQueue2 * queue);
268 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
269 static gboolean change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length);
270 #endif
271
272 typedef enum
273 {
274   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
275   GST_QUEUE2_ITEM_TYPE_BUFFER,
276   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
277   GST_QUEUE2_ITEM_TYPE_EVENT,
278   GST_QUEUE2_ITEM_TYPE_QUERY
279 } GstQueue2ItemType;
280
281 typedef struct
282 {
283   GstQueue2ItemType type;
284   GstMiniObject *item;
285 } GstQueue2Item;
286
287 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
288
289 static void
290 gst_queue2_class_init (GstQueue2Class * klass)
291 {
292   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
293   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
294
295   gobject_class->set_property = gst_queue2_set_property;
296   gobject_class->get_property = gst_queue2_get_property;
297
298   /* properties */
299   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
300       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
301           "Current amount of data in the queue (bytes)",
302           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
303   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
304       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
305           "Current number of buffers in the queue",
306           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
307   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
308       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
309           "Current amount of data in the queue (in ns)",
310           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
311
312   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
313       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
314           "Max. amount of data in the queue (bytes, 0=disable)",
315           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
316           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
317           G_PARAM_STATIC_STRINGS));
318   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
319       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
320           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
321           DEFAULT_MAX_SIZE_BUFFERS,
322           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
323           G_PARAM_STATIC_STRINGS));
324   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
325       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
326           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
327           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
328           G_PARAM_STATIC_STRINGS));
329
330   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
331       g_param_spec_boolean ("use-buffering", "Use buffering",
332           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
333           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
334           G_PARAM_STATIC_STRINGS));
335   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
336       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
337           "Estimate the bitrate of the stream to calculate time level",
338           DEFAULT_USE_RATE_ESTIMATE,
339           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
340   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
341       g_param_spec_int ("low-percent", "Low percent",
342           "Low threshold for buffering to start. Only used if use-buffering is True",
343           0, 100, DEFAULT_LOW_PERCENT,
344           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
346       g_param_spec_int ("high-percent", "High percent",
347           "High threshold for buffering to finish. Only used if use-buffering is True",
348           0, 100, DEFAULT_HIGH_PERCENT,
349           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350
351   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
352       g_param_spec_string ("temp-template", "Temporary File Template",
353           "File template to store temporary files in, should contain directory "
354           "and XXXXXX. (NULL == disabled)",
355           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
358       g_param_spec_string ("temp-location", "Temporary File Location",
359           "Location to store temporary files in (Only read this property, "
360           "use temp-template to configure the name template)",
361           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
362
363   /**
364    * GstQueue2:temp-remove
365    *
366    * When temp-template is set, remove the temporary file when going to READY.
367    */
368   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
369       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
370           "Remove the temp-location after use",
371           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372
373   /**
374    * GstQueue2:ring-buffer-max-size
375    *
376    * The maximum size of the ring buffer in bytes. If set to 0, the ring
377    * buffer is disabled. Default 0.
378    */
379   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
380       g_param_spec_uint64 ("ring-buffer-max-size",
381           "Max. ring buffer size (bytes)",
382           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
383           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
384           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
385
386   /* set several parent class virtual functions */
387   gobject_class->finalize = gst_queue2_finalize;
388
389   gst_element_class_add_pad_template (gstelement_class,
390       gst_static_pad_template_get (&srctemplate));
391   gst_element_class_add_pad_template (gstelement_class,
392       gst_static_pad_template_get (&sinktemplate));
393
394   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
395       "Generic",
396       "Simple data queue",
397       "Erik Walthinsen <omega@cse.ogi.edu>, "
398       "Wim Taymans <wim.taymans@gmail.com>");
399
400   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
401   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
402 }
403
404 static void
405 gst_queue2_init (GstQueue2 * queue)
406 {
407   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
408
409   gst_pad_set_chain_function (queue->sinkpad,
410       GST_DEBUG_FUNCPTR (gst_queue2_chain));
411   gst_pad_set_chain_list_function (queue->sinkpad,
412       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
413   gst_pad_set_activatemode_function (queue->sinkpad,
414       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
415   gst_pad_set_event_function (queue->sinkpad,
416       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
417   gst_pad_set_query_function (queue->sinkpad,
418       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
419   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
420   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
421
422   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
423
424   gst_pad_set_activatemode_function (queue->srcpad,
425       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
426   gst_pad_set_getrange_function (queue->srcpad,
427       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
428   gst_pad_set_event_function (queue->srcpad,
429       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
430   gst_pad_set_query_function (queue->srcpad,
431       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
432   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
433   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
434
435   /* levels */
436   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
437   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
438   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
439   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
440   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
441   queue->use_buffering = DEFAULT_USE_BUFFERING;
442   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
443   queue->low_percent = DEFAULT_LOW_PERCENT;
444   queue->high_percent = DEFAULT_HIGH_PERCENT;
445
446   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
447   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
448
449   queue->sinktime = GST_CLOCK_TIME_NONE;
450   queue->srctime = GST_CLOCK_TIME_NONE;
451   queue->sink_tainted = TRUE;
452   queue->src_tainted = TRUE;
453
454   queue->srcresult = GST_FLOW_FLUSHING;
455   queue->sinkresult = GST_FLOW_FLUSHING;
456   queue->is_eos = FALSE;
457   queue->in_timer = g_timer_new ();
458   queue->out_timer = g_timer_new ();
459
460   g_mutex_init (&queue->qlock);
461   queue->waiting_add = FALSE;
462   g_cond_init (&queue->item_add);
463   queue->waiting_del = FALSE;
464   g_cond_init (&queue->item_del);
465   g_queue_init (&queue->queue);
466
467   g_cond_init (&queue->query_handled);
468   queue->last_query = FALSE;
469
470   g_mutex_init (&queue->buffering_post_lock);
471   queue->buffering_percent = 100;
472
473   /* tempfile related */
474   queue->temp_template = NULL;
475   queue->temp_location = NULL;
476   queue->temp_remove = DEFAULT_TEMP_REMOVE;
477
478   queue->ring_buffer = NULL;
479   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
480
481   GST_DEBUG_OBJECT (queue,
482       "initialized queue's not_empty & not_full conditions");
483 }
484
485 /* called only once, as opposed to dispose */
486 static void
487 gst_queue2_finalize (GObject * object)
488 {
489   GstQueue2 *queue = GST_QUEUE2 (object);
490
491   GST_DEBUG_OBJECT (queue, "finalizing queue");
492
493   while (!g_queue_is_empty (&queue->queue)) {
494     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
495
496     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
497       gst_mini_object_unref (qitem->item);
498     g_slice_free (GstQueue2Item, qitem);
499   }
500
501   queue->last_query = FALSE;
502   g_queue_clear (&queue->queue);
503   g_mutex_clear (&queue->qlock);
504   g_mutex_clear (&queue->buffering_post_lock);
505   g_cond_clear (&queue->item_add);
506   g_cond_clear (&queue->item_del);
507   g_cond_clear (&queue->query_handled);
508   g_timer_destroy (queue->in_timer);
509   g_timer_destroy (queue->out_timer);
510
511   /* temp_file path cleanup  */
512   g_free (queue->temp_template);
513   g_free (queue->temp_location);
514
515   G_OBJECT_CLASS (parent_class)->finalize (object);
516 }
517
518 static void
519 debug_ranges (GstQueue2 * queue)
520 {
521   GstQueue2Range *walk;
522
523   for (walk = queue->ranges; walk; walk = walk->next) {
524     GST_DEBUG_OBJECT (queue,
525         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
526         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
527         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
528         walk->rb_writing_pos, walk->reading_pos,
529         walk == queue->current ? "**y**" : "  n  ");
530   }
531 }
532
533 /* clear all the downloaded ranges */
534 static void
535 clean_ranges (GstQueue2 * queue)
536 {
537   GST_DEBUG_OBJECT (queue, "clean queue ranges");
538
539   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
540   queue->ranges = NULL;
541   queue->current = NULL;
542 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
543   queue->read = NULL;
544 #endif
545 }
546
547 /* find a range that contains @offset or NULL when nothing does */
548 static GstQueue2Range *
549 find_range (GstQueue2 * queue, guint64 offset)
550 {
551   GstQueue2Range *range = NULL;
552   GstQueue2Range *walk;
553
554   /* first do a quick check for the current range */
555   for (walk = queue->ranges; walk; walk = walk->next) {
556     if (offset >= walk->offset && offset <= walk->writing_pos) {
557       /* we can reuse an existing range */
558       range = walk;
559       break;
560     }
561   }
562   if (range) {
563     GST_DEBUG_OBJECT (queue,
564         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
565         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
566   } else {
567     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
568   }
569   return range;
570 }
571
572 static void
573 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
574 {
575   guint64 max_reading_pos, writing_pos;
576
577   writing_pos = range->writing_pos;
578   max_reading_pos = range->max_reading_pos;
579
580   if (writing_pos > max_reading_pos)
581     queue->cur_level.bytes = writing_pos - max_reading_pos;
582   else
583     queue->cur_level.bytes = 0;
584 }
585
586 /* make a new range for @offset or reuse an existing range */
587 static GstQueue2Range *
588 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
589 {
590   GstQueue2Range *range, *prev, *next;
591
592   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
593
594   if ((range = find_range (queue, offset))) {
595     GST_DEBUG_OBJECT (queue,
596         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
597         range->writing_pos);
598     if (update_existing && range->writing_pos != offset) {
599       GST_DEBUG_OBJECT (queue, "updating range writing position to "
600           "%" G_GUINT64_FORMAT, offset);
601       range->writing_pos = offset;
602     }
603   } else {
604     GST_DEBUG_OBJECT (queue,
605         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
606
607     range = g_slice_new0 (GstQueue2Range);
608     range->offset = offset;
609     /* we want to write to the next location in the ring buffer */
610     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
611     range->writing_pos = offset;
612     range->rb_writing_pos = range->rb_offset;
613     range->reading_pos = offset;
614     range->max_reading_pos = offset;
615
616     /* insert sorted */
617     prev = NULL;
618     next = queue->ranges;
619     while (next) {
620       if (next->offset > offset) {
621         /* insert before next */
622         GST_DEBUG_OBJECT (queue,
623             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
624             next->offset);
625         break;
626       }
627       /* try next */
628       prev = next;
629       next = next->next;
630     }
631     range->next = next;
632     if (prev)
633       prev->next = range;
634     else
635       queue->ranges = range;
636   }
637   debug_ranges (queue);
638
639   /* update the stats for this range */
640   update_cur_level (queue, range);
641
642   return range;
643 }
644
645
646 /* clear and init the download ranges for offset 0 */
647 static void
648 init_ranges (GstQueue2 * queue)
649 {
650   GST_DEBUG_OBJECT (queue, "init queue ranges");
651
652   /* get rid of all the current ranges */
653   clean_ranges (queue);
654   /* make a range for offset 0 */
655 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
656   queue->current = queue->read = add_range (queue, 0, TRUE);
657 #else
658   queue->current = add_range (queue, 0, TRUE);
659 #endif
660 }
661
662 /* calculate the diff between running time on the sink and src of the queue.
663  * This is the total amount of time in the queue. */
664 static void
665 update_time_level (GstQueue2 * queue)
666 {
667   if (queue->sink_tainted) {
668     queue->sinktime =
669         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
670         queue->sink_segment.position);
671     queue->sink_tainted = FALSE;
672   }
673
674   if (queue->src_tainted) {
675     queue->srctime =
676         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
677         queue->src_segment.position);
678     queue->src_tainted = FALSE;
679   }
680
681   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
682       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
683
684   if (queue->sinktime != GST_CLOCK_TIME_NONE
685       && queue->srctime != GST_CLOCK_TIME_NONE
686       && queue->sinktime >= queue->srctime)
687     queue->cur_level.time = queue->sinktime - queue->srctime;
688   else
689     queue->cur_level.time = 0;
690 }
691
692 /* take a SEGMENT event and apply the values to segment, updating the time
693  * level of queue. */
694 static void
695 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
696     gboolean is_sink)
697 {
698   gst_event_copy_segment (event, segment);
699
700   if (segment->format == GST_FORMAT_BYTES) {
701     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
702       /* start is where we'll be getting from and as such writing next */
703 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
704       queue->current = queue->read = add_range (queue, segment->start, TRUE);
705 #else
706       queue->current = add_range (queue, segment->start, TRUE);
707 #endif
708     }
709   }
710
711   /* now configure the values, we use these to track timestamps on the
712    * sinkpad. */
713   if (segment->format != GST_FORMAT_TIME) {
714     /* non-time format, pretent the current time segment is closed with a
715      * 0 start and unknown stop time. */
716     segment->format = GST_FORMAT_TIME;
717     segment->start = 0;
718     segment->stop = -1;
719     segment->time = 0;
720   }
721
722   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
723
724   if (is_sink)
725     queue->sink_tainted = TRUE;
726   else
727     queue->src_tainted = TRUE;
728
729   /* segment can update the time level of the queue */
730   update_time_level (queue);
731 }
732
733 static void
734 apply_gap (GstQueue2 * queue, GstEvent * event,
735     GstSegment * segment, gboolean is_sink)
736 {
737   GstClockTime timestamp;
738   GstClockTime duration;
739
740   gst_event_parse_gap (event, &timestamp, &duration);
741
742   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
743
744     if (GST_CLOCK_TIME_IS_VALID (duration)) {
745       timestamp += duration;
746     }
747
748     segment->position = timestamp;
749
750     if (is_sink)
751       queue->sink_tainted = TRUE;
752     else
753       queue->src_tainted = TRUE;
754
755     /* calc diff with other end */
756     update_time_level (queue);
757   }
758 }
759
760 /* take a buffer and update segment, updating the time level of the queue. */
761 static void
762 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
763     gboolean is_sink)
764 {
765   GstClockTime duration, timestamp;
766
767   timestamp = GST_BUFFER_TIMESTAMP (buffer);
768   duration = GST_BUFFER_DURATION (buffer);
769
770   /* if no timestamp is set, assume it's continuous with the previous
771    * time */
772   if (timestamp == GST_CLOCK_TIME_NONE)
773     timestamp = segment->position;
774
775   /* add duration */
776   if (duration != GST_CLOCK_TIME_NONE)
777     timestamp += duration;
778
779   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
780       GST_TIME_ARGS (timestamp));
781
782   segment->position = timestamp;
783
784   if (is_sink)
785     queue->sink_tainted = TRUE;
786   else
787     queue->src_tainted = TRUE;
788
789   /* calc diff with other end */
790   update_time_level (queue);
791 }
792
793 static gboolean
794 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
795 {
796   GstClockTime *timestamp = data;
797
798   GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT
799       " duration %" GST_TIME_FORMAT, idx,
800       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
801       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
802
803   if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
804     *timestamp = GST_BUFFER_TIMESTAMP (*buf);
805
806   if (GST_BUFFER_DURATION_IS_VALID (*buf))
807     *timestamp += GST_BUFFER_DURATION (*buf);
808
809   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
810   return TRUE;
811 }
812
813 /* take a buffer list and update segment, updating the time level of the queue */
814 static void
815 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
816     GstSegment * segment, gboolean is_sink)
817 {
818   GstClockTime timestamp;
819
820   /* if no timestamp is set, assume it's continuous with the previous time */
821   timestamp = segment->position;
822
823   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
824
825   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
826       GST_TIME_ARGS (timestamp));
827
828   segment->position = timestamp;
829
830   if (is_sink)
831     queue->sink_tainted = TRUE;
832   else
833     queue->src_tainted = TRUE;
834
835   /* calc diff with other end */
836   update_time_level (queue);
837 }
838
839 static gboolean
840 get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
841     gint * percent)
842 {
843   gint perc;
844
845   if (queue->high_percent <= 0) {
846     if (percent)
847       *percent = 100;
848     if (is_buffering)
849       *is_buffering = FALSE;
850     return FALSE;
851   }
852 #define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
853
854   if (queue->is_eos) {
855     /* on EOS we are always 100% full, we set the var here so that it we can
856      * reuse the logic below to stop buffering */
857     perc = 100;
858     GST_LOG_OBJECT (queue, "we are EOS");
859   } else {
860     /* figure out the percent we are filled, we take the max of all formats. */
861     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
862       perc = GET_PERCENT (bytes, 0);
863     } else {
864       guint64 rb_size = queue->ring_buffer_max_size;
865       perc = GET_PERCENT (bytes, rb_size);
866     }
867     perc = MAX (perc, GET_PERCENT (time, 0));
868     perc = MAX (perc, GET_PERCENT (buffers, 0));
869
870     /* also apply the rate estimate when we need to */
871     if (queue->use_rate_estimate)
872       perc = MAX (perc, GET_PERCENT (rate_time, 0));
873   }
874 #undef GET_PERCENT
875
876   if (is_buffering)
877     *is_buffering = queue->is_buffering;
878
879   /* scale to high percent so that it becomes the 100% mark */
880   perc = perc * 100 / queue->high_percent;
881   /* clip */
882   if (perc > 100)
883     perc = 100;
884
885   if (percent)
886     *percent = perc;
887
888   GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
889       perc);
890
891   return TRUE;
892 }
893
894 static void
895 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
896     gint * avg_in, gint * avg_out, gint64 * buffering_left)
897 {
898   if (mode) {
899     if (!QUEUE_IS_USING_QUEUE (queue)) {
900       if (QUEUE_IS_USING_RING_BUFFER (queue))
901         *mode = GST_BUFFERING_TIMESHIFT;
902       else
903         *mode = GST_BUFFERING_DOWNLOAD;
904     } else {
905       *mode = GST_BUFFERING_STREAM;
906     }
907   }
908
909   if (avg_in)
910     *avg_in = queue->byte_in_rate;
911   if (avg_out)
912     *avg_out = queue->byte_out_rate;
913
914   if (buffering_left) {
915     *buffering_left = (percent == 100 ? 0 : -1);
916
917     if (queue->use_rate_estimate) {
918       guint64 max, cur;
919
920       max = queue->max_level.rate_time;
921       cur = queue->cur_level.rate_time;
922
923       if (percent != 100 && max > cur)
924         *buffering_left = (max - cur) / 1000000;
925     }
926   }
927 }
928
929 static void
930 gst_queue2_post_buffering (GstQueue2 * queue)
931 {
932   GstMessage *msg = NULL;
933
934   g_mutex_lock (&queue->buffering_post_lock);
935   GST_QUEUE2_MUTEX_LOCK (queue);
936   if (queue->percent_changed) {
937     gint percent = queue->buffering_percent;
938
939     queue->percent_changed = FALSE;
940
941     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
942     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
943
944     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
945         queue->avg_out, queue->buffering_left);
946   }
947   GST_QUEUE2_MUTEX_UNLOCK (queue);
948
949   if (msg != NULL)
950     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
951
952   g_mutex_unlock (&queue->buffering_post_lock);
953 }
954
955 static void
956 update_buffering (GstQueue2 * queue)
957 {
958   gint percent;
959 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
960   GstQueue2Range *range;
961
962   if (queue->read)
963     range = queue->read;
964   else
965     range = queue->current;
966 #endif
967
968   /* Ensure the variables used to calculate buffering state are up-to-date. */
969 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
970   if (range)
971     update_cur_level (queue, range);
972 #else
973   if (queue->current)
974     update_cur_level (queue, queue->current);
975 #endif
976   update_in_rates (queue);
977
978   if (!get_buffering_percent (queue, NULL, &percent))
979     return;
980
981   if (queue->is_buffering) {
982     /* if we were buffering see if we reached the high watermark */
983     if (percent >= 100)
984       queue->is_buffering = FALSE;
985
986     SET_PERCENT (queue, percent);
987   } else {
988     /* we were not buffering, check if we need to start buffering if we drop
989      * below the low threshold */
990     if (percent < queue->low_percent) {
991       queue->is_buffering = TRUE;
992       SET_PERCENT (queue, percent);
993     }
994   }
995 }
996
997 static void
998 reset_rate_timer (GstQueue2 * queue)
999 {
1000   queue->bytes_in = 0;
1001   queue->bytes_out = 0;
1002   queue->byte_in_rate = 0.0;
1003   queue->byte_in_period = 0;
1004   queue->byte_out_rate = 0.0;
1005   queue->last_in_elapsed = 0.0;
1006   queue->last_out_elapsed = 0.0;
1007   queue->in_timer_started = FALSE;
1008   queue->out_timer_started = FALSE;
1009 }
1010
1011 /* the interval in seconds to recalculate the rate */
1012 #define RATE_INTERVAL    0.2
1013 /* Tuning for rate estimation. We use a large window for the input rate because
1014  * it should be stable when connected to a network. The output rate is less
1015  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1016  * therefore adapt more quickly.
1017  * However, initial input rate may be subject to a burst, and should therefore
1018  * initially also adapt more quickly to changes, and only later on give higher
1019  * weight to previous values. */
1020 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1021 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1022
1023 static void
1024 update_in_rates (GstQueue2 * queue)
1025 {
1026   gdouble elapsed, period;
1027   gdouble byte_in_rate;
1028
1029   if (!queue->in_timer_started) {
1030     queue->in_timer_started = TRUE;
1031     g_timer_start (queue->in_timer);
1032     return;
1033   }
1034
1035   elapsed = g_timer_elapsed (queue->in_timer, NULL);
1036
1037   /* recalc after each interval. */
1038   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1039     period = elapsed - queue->last_in_elapsed;
1040
1041     GST_DEBUG_OBJECT (queue,
1042         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1043         period, queue->bytes_in, queue->byte_in_period);
1044
1045     byte_in_rate = queue->bytes_in / period;
1046
1047     if (queue->byte_in_rate == 0.0)
1048       queue->byte_in_rate = byte_in_rate;
1049     else
1050       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1051           (double) queue->byte_in_period, period);
1052
1053     /* another data point, cap at 16 for long time running average */
1054     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1055       queue->byte_in_period += period;
1056
1057     /* reset the values to calculate rate over the next interval */
1058     queue->last_in_elapsed = elapsed;
1059     queue->bytes_in = 0;
1060   }
1061
1062   if (queue->byte_in_rate > 0.0) {
1063     queue->cur_level.rate_time =
1064         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1065   }
1066   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1067       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1068 }
1069
1070 static void
1071 update_out_rates (GstQueue2 * queue)
1072 {
1073   gdouble elapsed, period;
1074   gdouble byte_out_rate;
1075
1076   if (!queue->out_timer_started) {
1077     queue->out_timer_started = TRUE;
1078     g_timer_start (queue->out_timer);
1079     return;
1080   }
1081
1082   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1083
1084   /* recalc after each interval. */
1085   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1086     period = elapsed - queue->last_out_elapsed;
1087
1088     GST_DEBUG_OBJECT (queue,
1089         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1090
1091     byte_out_rate = queue->bytes_out / period;
1092
1093     if (queue->byte_out_rate == 0.0)
1094       queue->byte_out_rate = byte_out_rate;
1095     else
1096       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1097
1098     /* reset the values to calculate rate over the next interval */
1099     queue->last_out_elapsed = elapsed;
1100     queue->bytes_out = 0;
1101   }
1102   if (queue->byte_in_rate > 0.0) {
1103     queue->cur_level.rate_time =
1104         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1105   }
1106   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1107       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1108 }
1109
1110 static void
1111 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1112 {
1113   guint64 reading_pos, max_reading_pos;
1114
1115   reading_pos = pos;
1116   max_reading_pos = range->max_reading_pos;
1117
1118   max_reading_pos = MAX (max_reading_pos, reading_pos);
1119
1120   GST_DEBUG_OBJECT (queue,
1121       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1122       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1123   range->max_reading_pos = max_reading_pos;
1124
1125   update_cur_level (queue, range);
1126 }
1127
1128 static gboolean
1129 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1130 {
1131   GstEvent *event;
1132   gboolean res;
1133
1134   /* until we receive the FLUSH_STOP from this seek, we skip data */
1135   queue->seeking = TRUE;
1136   GST_QUEUE2_MUTEX_UNLOCK (queue);
1137
1138   debug_ranges (queue);
1139
1140   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1141
1142   event =
1143       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1144       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1145       GST_SEEK_TYPE_NONE, -1);
1146
1147   res = gst_pad_push_event (queue->sinkpad, event);
1148   GST_QUEUE2_MUTEX_LOCK (queue);
1149
1150   if (res) {
1151     /* Between us sending the seek event and re-acquiring the lock, the source
1152      * thread might already have pushed data and moved along the range's
1153      * writing_pos beyond the seek offset. In that case we don't want to set
1154      * the writing position back to the requested seek position, as it would
1155      * cause data to be written to the wrong offset in the file or ring buffer.
1156      * We still do the add_range call to switch the current range to the
1157      * requested range, or create one if one doesn't exist yet. */
1158 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1159     queue->current = queue->read = add_range (queue, offset, FALSE);
1160 #else
1161     queue->current = add_range (queue, offset, FALSE);
1162 #endif
1163   }
1164
1165   return res;
1166 }
1167
1168 /* get the threshold for when we decide to seek rather than wait */
1169 static guint64
1170 get_seek_threshold (GstQueue2 * queue)
1171 {
1172   guint64 threshold;
1173
1174   /* FIXME, find a good threshold based on the incoming rate. */
1175   threshold = 1024 * 512;
1176
1177   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1178     threshold = MIN (threshold,
1179         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1180   }
1181   return threshold;
1182 }
1183
1184 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1185 /* check the buffered data size, not to change range repeatedly.
1186  * changing range cause the new http connection and it makes buffering state frequently. */
1187 static gboolean
1188 change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length)
1189 {
1190   guint64 curr_level = 0;
1191   guint64 curr_min_level = 0;
1192
1193   if (queue->current->writing_pos > queue->current->reading_pos)
1194     curr_level = queue->current->writing_pos - queue->current->reading_pos;
1195
1196   /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
1197   curr_min_level = MIN((queue->max_level.bytes*0.05), get_seek_threshold(queue));
1198
1199   GST_DEBUG_OBJECT(queue, " curr[w%"G_GUINT64_FORMAT", r%"G_GUINT64_FORMAT
1200         ", d%"G_GUINT64_FORMAT", m%"G_GUINT64_FORMAT
1201         "] u[%"G_GUINT64_FORMAT"][EOS:%d]",
1202         queue->current->writing_pos, queue->current->reading_pos,
1203         curr_level, curr_min_level,
1204         queue->upstream_size, queue->is_eos);
1205
1206   /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
1207   if ((queue->is_eos) ||
1208       (queue->upstream_size > 0 && queue->current->writing_pos >= queue->upstream_size) ||
1209       (curr_level >= curr_min_level)) {
1210     GST_DEBUG_OBJECT (queue, "Range Switching");
1211     return TRUE;
1212   } else {
1213     GST_DEBUG_OBJECT (queue, "keep receiving data without range switching.");
1214     return FALSE;
1215   }
1216 }
1217 #endif
1218
1219 /* see if there is enough data in the file to read a full buffer */
1220 static gboolean
1221 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1222 {
1223   GstQueue2Range *range;
1224 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1225   gboolean need_to_seek = TRUE;
1226 #endif
1227   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1228       offset, length);
1229
1230   if ((range = find_range (queue, offset))) {
1231 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1232     queue->read = range;
1233 #endif
1234     if (queue->current != range) {
1235       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1236 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1237       if (QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER(queue))
1238         need_to_seek = change_current_range(queue, range, offset, length);
1239
1240       if (need_to_seek == TRUE)
1241         perform_seek_to_offset (queue, range->writing_pos);
1242 #else
1243       perform_seek_to_offset (queue, range->writing_pos);
1244 #endif
1245     }
1246
1247     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1248         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1249
1250     /* we have a range for offset */
1251     GST_DEBUG_OBJECT (queue,
1252         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1253         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1254
1255     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1256       return TRUE;
1257
1258     if (offset + length <= range->writing_pos)
1259       return TRUE;
1260     else
1261       GST_DEBUG_OBJECT (queue,
1262           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1263           (offset + length) - range->writing_pos);
1264
1265   } else {
1266     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1267         " len %u", offset, length);
1268
1269 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1270     queue->read = queue->current;
1271 #endif
1272
1273     /* we don't have the range, see how far away we are */
1274     if (!queue->is_eos && queue->current) {
1275       guint64 threshold = get_seek_threshold (queue);
1276
1277       if (offset >= queue->current->offset && offset <=
1278           queue->current->writing_pos + threshold) {
1279         GST_INFO_OBJECT (queue,
1280             "requested data is within range, wait for data");
1281         return FALSE;
1282       }
1283     }
1284
1285     /* too far away, do a seek */
1286     perform_seek_to_offset (queue, offset);
1287   }
1288
1289   return FALSE;
1290 }
1291
1292 #ifdef HAVE_FSEEKO
1293 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1294 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1295 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1296 #else
1297 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1298 #endif
1299
1300 static GstFlowReturn
1301 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1302     guint8 * dst, gint64 * read_return)
1303 {
1304   guint8 *ring_buffer;
1305   size_t res;
1306
1307   ring_buffer = queue->ring_buffer;
1308
1309   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1310     goto seek_failed;
1311
1312   /* this should not block */
1313   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1314       length, offset);
1315   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1316     res = fread (dst, 1, length, queue->temp_file);
1317   } else {
1318     memcpy (dst, ring_buffer + offset, length);
1319     res = length;
1320   }
1321
1322   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1323
1324   if (G_UNLIKELY (res < length)) {
1325     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1326       goto could_not_read;
1327     /* check for errors or EOF */
1328     if (ferror (queue->temp_file))
1329       goto could_not_read;
1330     if (feof (queue->temp_file) && length > 0)
1331       goto eos;
1332   }
1333
1334   *read_return = res;
1335
1336   return GST_FLOW_OK;
1337
1338 seek_failed:
1339   {
1340     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1341     return GST_FLOW_ERROR;
1342   }
1343 could_not_read:
1344   {
1345     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1346     return GST_FLOW_ERROR;
1347   }
1348 eos:
1349   {
1350     GST_DEBUG ("non-regular file hits EOS");
1351     return GST_FLOW_EOS;
1352   }
1353 }
1354
1355 static GstFlowReturn
1356 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1357     GstBuffer ** buffer)
1358 {
1359   GstBuffer *buf;
1360   GstMapInfo info;
1361   guint8 *data;
1362   guint64 file_offset;
1363   guint block_length, remaining, read_length;
1364   guint64 rb_size;
1365   guint64 max_size;
1366   guint64 rpos;
1367   GstFlowReturn ret = GST_FLOW_OK;
1368
1369   /* allocate the output buffer of the requested size */
1370   if (*buffer == NULL)
1371     buf = gst_buffer_new_allocate (NULL, length, NULL);
1372   else
1373     buf = *buffer;
1374
1375   gst_buffer_map (buf, &info, GST_MAP_WRITE);
1376   data = info.data;
1377
1378   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1379       offset);
1380
1381   rpos = offset;
1382   rb_size = queue->ring_buffer_max_size;
1383   max_size = QUEUE_MAX_BYTES (queue);
1384
1385   remaining = length;
1386   while (remaining > 0) {
1387     /* configure how much/whether to read */
1388     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1389       read_length = 0;
1390
1391       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1392         guint64 level;
1393
1394         /* calculate how far away the offset is */
1395 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1396         if (queue->read->writing_pos > rpos)
1397           level = queue->read->writing_pos - rpos;
1398 #else
1399         if (queue->current->writing_pos > rpos)
1400           level = queue->current->writing_pos - rpos;
1401 #endif
1402         else
1403           level = 0;
1404
1405         GST_DEBUG_OBJECT (queue,
1406             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1407             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1408 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1409             rpos, queue->read->writing_pos, level, max_size);
1410 #else
1411             rpos, queue->current->writing_pos, level, max_size);
1412 #endif
1413
1414         if (level >= max_size) {
1415           /* we don't have the data but if we have a ring buffer that is full, we
1416            * need to read */
1417           GST_DEBUG_OBJECT (queue,
1418               "ring buffer full, reading QUEUE_MAX_BYTES %"
1419               G_GUINT64_FORMAT " bytes", max_size);
1420           read_length = max_size;
1421         } else if (queue->is_eos) {
1422           /* won't get any more data so read any data we have */
1423           if (level) {
1424             GST_DEBUG_OBJECT (queue,
1425                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1426                 level);
1427             read_length = level;
1428             remaining = level;
1429             length = level;
1430           } else
1431             goto hit_eos;
1432         }
1433       }
1434
1435       if (read_length == 0) {
1436         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1437 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1438           GST_DEBUG_OBJECT (queue,
1439               "update current position [%" G_GUINT64_FORMAT "-%"
1440               G_GUINT64_FORMAT "]", rpos, queue->read->max_reading_pos);
1441           update_cur_pos (queue, queue->read, rpos);
1442 #else
1443           GST_DEBUG_OBJECT (queue,
1444               "update current position [%" G_GUINT64_FORMAT "-%"
1445               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1446           update_cur_pos (queue, queue->current, rpos);
1447 #endif
1448           GST_QUEUE2_SIGNAL_DEL (queue);
1449         }
1450
1451         if (queue->use_buffering)
1452           update_buffering (queue);
1453
1454         GST_DEBUG_OBJECT (queue, "waiting for add");
1455         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1456         continue;
1457       }
1458     } else {
1459       /* we have the requested data so read it */
1460       read_length = remaining;
1461     }
1462
1463     /* set range reading_pos to actual reading position for this read */
1464 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1465     queue->read->reading_pos = rpos;
1466 #else
1467     queue->current->reading_pos = rpos;
1468 #endif
1469
1470     /* configure how much and from where to read */
1471     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1472       file_offset =
1473 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1474           (queue->read->rb_offset + (rpos -
1475               queue->read->offset)) % rb_size;
1476 #else
1477           (queue->current->rb_offset + (rpos -
1478               queue->current->offset)) % rb_size;
1479 #endif
1480       if (file_offset + read_length > rb_size) {
1481         block_length = rb_size - file_offset;
1482       } else {
1483         block_length = read_length;
1484       }
1485     } else {
1486       file_offset = rpos;
1487       block_length = read_length;
1488     }
1489
1490     /* while we still have data to read, we loop */
1491     while (read_length > 0) {
1492       gint64 read_return;
1493
1494       ret =
1495           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1496           data, &read_return);
1497       if (ret != GST_FLOW_OK)
1498         goto read_error;
1499
1500       file_offset += read_return;
1501       if (QUEUE_IS_USING_RING_BUFFER (queue))
1502         file_offset %= rb_size;
1503
1504       data += read_return;
1505       read_length -= read_return;
1506       block_length = read_length;
1507       remaining -= read_return;
1508
1509 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1510       rpos = (queue->read->reading_pos += read_return);
1511       update_cur_pos (queue, queue->read, queue->read->reading_pos);
1512 #else
1513       rpos = (queue->current->reading_pos += read_return);
1514       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1515 #endif
1516     }
1517     GST_QUEUE2_SIGNAL_DEL (queue);
1518     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1519   }
1520
1521   gst_buffer_unmap (buf, &info);
1522   gst_buffer_resize (buf, 0, length);
1523
1524   GST_BUFFER_OFFSET (buf) = offset;
1525   GST_BUFFER_OFFSET_END (buf) = offset + length;
1526
1527   *buffer = buf;
1528
1529   return ret;
1530
1531   /* ERRORS */
1532 hit_eos:
1533   {
1534     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1535     gst_buffer_unmap (buf, &info);
1536     if (*buffer == NULL)
1537       gst_buffer_unref (buf);
1538     return GST_FLOW_EOS;
1539   }
1540 out_flushing:
1541   {
1542     GST_DEBUG_OBJECT (queue, "we are flushing");
1543     gst_buffer_unmap (buf, &info);
1544     if (*buffer == NULL)
1545       gst_buffer_unref (buf);
1546     return GST_FLOW_FLUSHING;
1547   }
1548 read_error:
1549   {
1550     GST_DEBUG_OBJECT (queue, "we have a read error");
1551     gst_buffer_unmap (buf, &info);
1552     if (*buffer == NULL)
1553       gst_buffer_unref (buf);
1554     return ret;
1555   }
1556 }
1557
1558 /* should be called with QUEUE_LOCK */
1559 static GstMiniObject *
1560 gst_queue2_read_item_from_file (GstQueue2 * queue)
1561 {
1562   GstMiniObject *item;
1563
1564   if (queue->stream_start_event != NULL) {
1565     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1566     queue->stream_start_event = NULL;
1567   } else if (queue->starting_segment != NULL) {
1568     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1569     queue->starting_segment = NULL;
1570   } else {
1571     GstFlowReturn ret;
1572     GstBuffer *buffer = NULL;
1573     guint64 reading_pos;
1574
1575 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1576     reading_pos = queue->read->reading_pos;
1577 #else
1578     reading_pos = queue->current->reading_pos;
1579 #endif
1580
1581     ret =
1582         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1583         &buffer);
1584
1585     switch (ret) {
1586       case GST_FLOW_OK:
1587         item = GST_MINI_OBJECT_CAST (buffer);
1588         break;
1589       case GST_FLOW_EOS:
1590         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1591         break;
1592       default:
1593         item = NULL;
1594         break;
1595     }
1596   }
1597   return item;
1598 }
1599
1600 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1601  * the temp filename. */
1602 static gboolean
1603 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1604 {
1605   gint fd = -1;
1606   gchar *name = NULL;
1607
1608   if (queue->temp_file)
1609     goto already_opened;
1610
1611   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1612
1613   /* If temp_template was set, allocate a filename and open that filen */
1614
1615   /* nothing to do */
1616   if (queue->temp_template == NULL)
1617     goto no_directory;
1618
1619   /* make copy of the template, we don't want to change this */
1620   name = g_strdup (queue->temp_template);
1621   fd = g_mkstemp (name);
1622   if (fd == -1)
1623     goto mkstemp_failed;
1624
1625   /* open the file for update/writing */
1626   queue->temp_file = fdopen (fd, "wb+");
1627   /* error creating file */
1628   if (queue->temp_file == NULL)
1629     goto open_failed;
1630
1631   g_free (queue->temp_location);
1632   queue->temp_location = name;
1633
1634   GST_QUEUE2_MUTEX_UNLOCK (queue);
1635
1636   /* we can't emit the notify with the lock */
1637   g_object_notify (G_OBJECT (queue), "temp-location");
1638
1639   GST_QUEUE2_MUTEX_LOCK (queue);
1640
1641   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1642
1643   return TRUE;
1644
1645   /* ERRORS */
1646 already_opened:
1647   {
1648     GST_DEBUG_OBJECT (queue, "temp file was already open");
1649     return TRUE;
1650   }
1651 no_directory:
1652   {
1653     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1654         (_("No Temp directory specified.")), (NULL));
1655     return FALSE;
1656   }
1657 mkstemp_failed:
1658   {
1659     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1660         (_("Could not create temp file \"%s\"."), queue->temp_template),
1661         GST_ERROR_SYSTEM);
1662     g_free (name);
1663     return FALSE;
1664   }
1665 open_failed:
1666   {
1667     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1668         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1669     g_free (name);
1670     if (fd != -1)
1671       close (fd);
1672     return FALSE;
1673   }
1674 }
1675
1676 static void
1677 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1678 {
1679   /* nothing to do */
1680   if (queue->temp_file == NULL)
1681     return;
1682
1683   GST_DEBUG_OBJECT (queue, "closing temp file");
1684
1685   fflush (queue->temp_file);
1686   fclose (queue->temp_file);
1687
1688   if (queue->temp_remove) {
1689     if (remove (queue->temp_location) < 0) {
1690       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1691           queue->temp_location, g_strerror (errno));
1692     }
1693   }
1694
1695   queue->temp_file = NULL;
1696   clean_ranges (queue);
1697 }
1698
1699 static void
1700 gst_queue2_flush_temp_file (GstQueue2 * queue)
1701 {
1702   if (queue->temp_file == NULL)
1703     return;
1704
1705   GST_DEBUG_OBJECT (queue, "flushing temp file");
1706
1707   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1708 }
1709
1710 static void
1711 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1712 {
1713   if (!QUEUE_IS_USING_QUEUE (queue)) {
1714     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1715       gst_queue2_flush_temp_file (queue);
1716     init_ranges (queue);
1717   } else {
1718     while (!g_queue_is_empty (&queue->queue)) {
1719       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1720
1721       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1722           && GST_EVENT_IS_STICKY (qitem->item)
1723           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1724           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1725         gst_pad_store_sticky_event (queue->srcpad,
1726             GST_EVENT_CAST (qitem->item));
1727       }
1728
1729       /* Then lose another reference because we are supposed to destroy that
1730          data when flushing */
1731       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1732         gst_mini_object_unref (qitem->item);
1733       g_slice_free (GstQueue2Item, qitem);
1734     }
1735   }
1736   queue->last_query = FALSE;
1737   g_cond_signal (&queue->query_handled);
1738   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1739   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1740   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1741   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1742   queue->sink_tainted = queue->src_tainted = TRUE;
1743   if (queue->starting_segment != NULL)
1744     gst_event_unref (queue->starting_segment);
1745   queue->starting_segment = NULL;
1746   queue->segment_event_received = FALSE;
1747   gst_event_replace (&queue->stream_start_event, NULL);
1748
1749   /* we deleted a lot of something */
1750   GST_QUEUE2_SIGNAL_DEL (queue);
1751 }
1752
1753 static gboolean
1754 gst_queue2_wait_free_space (GstQueue2 * queue)
1755 {
1756   /* We make space available if we're "full" according to whatever
1757    * the user defined as "full". */
1758   if (gst_queue2_is_filled (queue)) {
1759     gboolean started;
1760
1761     /* pause the timer while we wait. The fact that we are waiting does not mean
1762      * the byterate on the input pad is lower */
1763     if ((started = queue->in_timer_started))
1764       g_timer_stop (queue->in_timer);
1765
1766     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1767         "queue is full, waiting for free space");
1768     do {
1769       /* Wait for space to be available, we could be unlocked because of a flush. */
1770       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1771     }
1772     while (gst_queue2_is_filled (queue));
1773
1774     /* and continue if we were running before */
1775     if (started)
1776       g_timer_continue (queue->in_timer);
1777   }
1778   return TRUE;
1779
1780   /* ERRORS */
1781 out_flushing:
1782   {
1783     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1784     return FALSE;
1785   }
1786 }
1787
1788 static gboolean
1789 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1790 {
1791   GstMapInfo info;
1792   guint8 *data, *ring_buffer;
1793   guint size, rb_size;
1794   guint64 writing_pos, new_writing_pos;
1795   GstQueue2Range *range, *prev, *next;
1796   gboolean do_seek = FALSE;
1797
1798   if (QUEUE_IS_USING_RING_BUFFER (queue))
1799     writing_pos = queue->current->rb_writing_pos;
1800   else
1801     writing_pos = queue->current->writing_pos;
1802   ring_buffer = queue->ring_buffer;
1803   rb_size = queue->ring_buffer_max_size;
1804
1805   gst_buffer_map (buffer, &info, GST_MAP_READ);
1806
1807   size = info.size;
1808   data = info.data;
1809
1810   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1811       writing_pos);
1812
1813   /* sanity check */
1814   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1815       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1816     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1817         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1818         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1819   }
1820
1821   while (size > 0) {
1822     guint to_write;
1823
1824     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1825       gint64 space;
1826
1827       /* calculate the space in the ring buffer not used by data from
1828        * the current range */
1829       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1830         /* wait until there is some free space */
1831         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1832       }
1833       /* get the amount of space we have */
1834       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1835
1836       /* calculate if we need to split or if we can write the entire
1837        * buffer now */
1838       to_write = MIN (size, space);
1839
1840       /* the writing position in the ring buffer after writing (part
1841        * or all of) the buffer */
1842       new_writing_pos = (writing_pos + to_write) % rb_size;
1843
1844       prev = NULL;
1845       range = queue->ranges;
1846
1847       /* if we need to overwrite data in the ring buffer, we need to
1848        * update the ranges
1849        *
1850        * warning: this code is complicated and includes some
1851        * simplifications - pen, paper and diagrams for the cases
1852        * recommended! */
1853       while (range) {
1854         guint64 range_data_start, range_data_end;
1855         GstQueue2Range *range_to_destroy = NULL;
1856
1857         range_data_start = range->rb_offset;
1858         range_data_end = range->rb_writing_pos;
1859
1860         /* handle the special case where the range has no data in it */
1861         if (range->writing_pos == range->offset) {
1862           if (range != queue->current) {
1863             GST_DEBUG_OBJECT (queue,
1864                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1865                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1866             /* remove range */
1867             range_to_destroy = range;
1868             if (prev)
1869               prev->next = range->next;
1870           }
1871           goto next_range;
1872         }
1873
1874         if (range_data_end > range_data_start) {
1875           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1876             goto next_range;
1877
1878           if (new_writing_pos > range_data_start) {
1879             if (new_writing_pos >= range_data_end) {
1880               GST_DEBUG_OBJECT (queue,
1881                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1882                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1883               /* remove range */
1884               range_to_destroy = range;
1885               if (prev)
1886                 prev->next = range->next;
1887             } else {
1888               GST_DEBUG_OBJECT (queue,
1889                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1890                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1891                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1892                   range->offset + new_writing_pos - range_data_start,
1893                   new_writing_pos);
1894               range->offset += (new_writing_pos - range_data_start);
1895               range->rb_offset = new_writing_pos;
1896             }
1897           }
1898         } else {
1899           guint64 new_wpos_virt = writing_pos + to_write;
1900
1901           if (new_wpos_virt <= range_data_start)
1902             goto next_range;
1903
1904           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1905             GST_DEBUG_OBJECT (queue,
1906                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1907                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1908             /* remove range */
1909             range_to_destroy = range;
1910             if (prev)
1911               prev->next = range->next;
1912           } else {
1913             GST_DEBUG_OBJECT (queue,
1914                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1915                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1916                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1917                 range->offset + new_writing_pos - range_data_start,
1918                 new_writing_pos);
1919             range->offset += (new_wpos_virt - range_data_start);
1920             range->rb_offset = new_writing_pos;
1921           }
1922         }
1923
1924       next_range:
1925         if (!range_to_destroy)
1926           prev = range;
1927
1928         range = range->next;
1929         if (range_to_destroy) {
1930           if (range_to_destroy == queue->ranges)
1931             queue->ranges = range;
1932           g_slice_free (GstQueue2Range, range_to_destroy);
1933           range_to_destroy = NULL;
1934         }
1935       }
1936     } else {
1937       to_write = size;
1938       new_writing_pos = writing_pos + to_write;
1939     }
1940
1941     if (QUEUE_IS_USING_TEMP_FILE (queue)
1942         && FSEEK_FILE (queue->temp_file, writing_pos))
1943       goto seek_failed;
1944
1945     if (new_writing_pos > writing_pos) {
1946       GST_INFO_OBJECT (queue,
1947           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1948           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1949           queue->current->writing_pos, queue->current->rb_writing_pos);
1950       /* either not using ring buffer or no wrapping, just write */
1951       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1952         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1953           goto handle_error;
1954       } else {
1955         memcpy (ring_buffer + writing_pos, data, to_write);
1956       }
1957
1958       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1959         /* try to merge with next range */
1960         while ((next = queue->current->next)) {
1961           GST_INFO_OBJECT (queue,
1962               "checking merge with next range %" G_GUINT64_FORMAT " < %"
1963               G_GUINT64_FORMAT, new_writing_pos, next->offset);
1964           if (new_writing_pos < next->offset)
1965             break;
1966
1967           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1968               next->writing_pos);
1969
1970           /* remove the group */
1971           queue->current->next = next->next;
1972
1973           /* We use the threshold to decide if we want to do a seek or simply
1974            * read the data again. If there is not so much data in the range we
1975            * prefer to avoid to seek and read it again. */
1976           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
1977             /* the new range had more data than the threshold, it's worth keeping
1978              * it and doing a seek. */
1979             new_writing_pos = next->writing_pos;
1980             do_seek = TRUE;
1981           }
1982           g_slice_free (GstQueue2Range, next);
1983         }
1984         goto update_and_signal;
1985       }
1986     } else {
1987       /* wrapping */
1988       guint block_one, block_two;
1989
1990       block_one = rb_size - writing_pos;
1991       block_two = to_write - block_one;
1992
1993       if (block_one > 0) {
1994         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
1995         /* write data to end of ring buffer */
1996         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1997           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1998             goto handle_error;
1999         } else {
2000           memcpy (ring_buffer + writing_pos, data, block_one);
2001         }
2002       }
2003
2004       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2005         goto seek_failed;
2006
2007       if (block_two > 0) {
2008         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2009         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2010           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2011             goto handle_error;
2012         } else {
2013           memcpy (ring_buffer, data + block_one, block_two);
2014         }
2015       }
2016     }
2017
2018   update_and_signal:
2019     /* update the writing positions */
2020     size -= to_write;
2021     GST_INFO_OBJECT (queue,
2022         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2023         to_write, writing_pos, size);
2024
2025     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2026       data += to_write;
2027       queue->current->writing_pos += to_write;
2028       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2029     } else {
2030       queue->current->writing_pos = writing_pos = new_writing_pos;
2031 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
2032       if (do_seek) {
2033         if (queue->upstream_size == 0 || new_writing_pos < queue->upstream_size)
2034           perform_seek_to_offset (queue, new_writing_pos);
2035         else
2036           queue->is_eos = TRUE;
2037       }
2038 #endif
2039     }
2040
2041 #ifndef TIZEN_FEATURE_QUEUE2_MODIFICATION
2042     if (do_seek)
2043       perform_seek_to_offset (queue, new_writing_pos);
2044 #endif
2045
2046     update_cur_level (queue, queue->current);
2047
2048     /* update the buffering status */
2049     if (queue->use_buffering)
2050       update_buffering (queue);
2051
2052     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2053         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2054
2055     GST_QUEUE2_SIGNAL_ADD (queue);
2056   }
2057
2058   gst_buffer_unmap (buffer, &info);
2059
2060   return TRUE;
2061
2062   /* ERRORS */
2063 out_flushing:
2064   {
2065     GST_DEBUG_OBJECT (queue, "we are flushing");
2066     gst_buffer_unmap (buffer, &info);
2067     /* FIXME - GST_FLOW_EOS ? */
2068     return FALSE;
2069   }
2070 seek_failed:
2071   {
2072     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2073     gst_buffer_unmap (buffer, &info);
2074     return FALSE;
2075   }
2076 handle_error:
2077   {
2078     switch (errno) {
2079       case ENOSPC:{
2080         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2081         break;
2082       }
2083       default:{
2084         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2085             (_("Error while writing to download file.")),
2086             ("%s", g_strerror (errno)));
2087       }
2088     }
2089     gst_buffer_unmap (buffer, &info);
2090     return FALSE;
2091   }
2092 }
2093
2094 static gboolean
2095 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2096 {
2097   GstQueue2 *queue = q;
2098
2099   GST_TRACE_OBJECT (queue,
2100       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2101       gst_buffer_get_size (*buf));
2102
2103   if (!gst_queue2_create_write (queue, *buf)) {
2104     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2105     return FALSE;
2106   }
2107   return TRUE;
2108 }
2109
2110 static gboolean
2111 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2112 {
2113   guint *p_size = data;
2114   gsize buf_size;
2115
2116   buf_size = gst_buffer_get_size (*buf);
2117   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2118   *p_size += buf_size;
2119   return TRUE;
2120 }
2121
2122 /* enqueue an item an update the level stats */
2123 static void
2124 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2125     GstQueue2ItemType item_type)
2126 {
2127   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2128     GstBuffer *buffer;
2129     guint size;
2130
2131     buffer = GST_BUFFER_CAST (item);
2132     size = gst_buffer_get_size (buffer);
2133
2134     /* add buffer to the statistics */
2135     if (QUEUE_IS_USING_QUEUE (queue)) {
2136       queue->cur_level.buffers++;
2137       queue->cur_level.bytes += size;
2138     }
2139     queue->bytes_in += size;
2140
2141     /* apply new buffer to segment stats */
2142     apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
2143     /* update the byterate stats */
2144     update_in_rates (queue);
2145
2146     if (!QUEUE_IS_USING_QUEUE (queue)) {
2147       /* FIXME - check return value? */
2148       gst_queue2_create_write (queue, buffer);
2149     }
2150   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2151     GstBufferList *buffer_list;
2152     guint size = 0;
2153
2154     buffer_list = GST_BUFFER_LIST_CAST (item);
2155
2156     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2157     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2158
2159     /* add buffer to the statistics */
2160     if (QUEUE_IS_USING_QUEUE (queue)) {
2161       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2162       queue->cur_level.bytes += size;
2163     }
2164     queue->bytes_in += size;
2165
2166     /* apply new buffer to segment stats */
2167     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2168
2169     /* update the byterate stats */
2170     update_in_rates (queue);
2171
2172     if (!QUEUE_IS_USING_QUEUE (queue)) {
2173       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2174     }
2175   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2176     GstEvent *event;
2177
2178     event = GST_EVENT_CAST (item);
2179
2180     switch (GST_EVENT_TYPE (event)) {
2181       case GST_EVENT_EOS:
2182         /* Zero the thresholds, this makes sure the queue is completely
2183          * filled and we can read all data from the queue. */
2184         GST_DEBUG_OBJECT (queue, "we have EOS");
2185         queue->is_eos = TRUE;
2186         break;
2187       case GST_EVENT_SEGMENT:
2188         apply_segment (queue, event, &queue->sink_segment, TRUE);
2189         /* This is our first new segment, we hold it
2190          * as we can't save it on the temp file */
2191         if (!QUEUE_IS_USING_QUEUE (queue)) {
2192           if (queue->segment_event_received)
2193             goto unexpected_event;
2194
2195           queue->segment_event_received = TRUE;
2196           if (queue->starting_segment != NULL)
2197             gst_event_unref (queue->starting_segment);
2198           queue->starting_segment = event;
2199           item = NULL;
2200         }
2201         /* a new segment allows us to accept more buffers if we got EOS
2202          * from downstream */
2203         queue->unexpected = FALSE;
2204         break;
2205       case GST_EVENT_GAP:
2206         apply_gap (queue, event, &queue->sink_segment, TRUE);
2207         break;
2208       case GST_EVENT_STREAM_START:
2209         if (!QUEUE_IS_USING_QUEUE (queue)) {
2210           gst_event_replace (&queue->stream_start_event, event);
2211           gst_event_unref (event);
2212           item = NULL;
2213         }
2214         break;
2215       case GST_EVENT_CAPS:{
2216         GstCaps *caps;
2217
2218         gst_event_parse_caps (event, &caps);
2219         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2220
2221         if (!QUEUE_IS_USING_QUEUE (queue)) {
2222           GST_LOG ("Dropping caps event, not using queue");
2223           gst_event_unref (event);
2224           item = NULL;
2225         }
2226         break;
2227       }
2228       default:
2229         if (!QUEUE_IS_USING_QUEUE (queue))
2230           goto unexpected_event;
2231         break;
2232     }
2233   } else if (GST_IS_QUERY (item)) {
2234     /* Can't happen as we check that in the caller */
2235     if (!QUEUE_IS_USING_QUEUE (queue))
2236       g_assert_not_reached ();
2237   } else {
2238     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2239         item, GST_OBJECT_NAME (queue));
2240     /* we can't really unref since we don't know what it is */
2241     item = NULL;
2242   }
2243
2244   if (item) {
2245     /* update the buffering status */
2246     if (queue->use_buffering)
2247       update_buffering (queue);
2248
2249     if (QUEUE_IS_USING_QUEUE (queue)) {
2250       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2251       qitem->type = item_type;
2252       qitem->item = item;
2253       g_queue_push_tail (&queue->queue, qitem);
2254     } else {
2255       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2256     }
2257
2258     GST_QUEUE2_SIGNAL_ADD (queue);
2259   }
2260
2261   return;
2262
2263   /* ERRORS */
2264 unexpected_event:
2265   {
2266     g_warning
2267         ("Unexpected event of kind %s can't be added in temp file of queue %s ",
2268         gst_event_type_get_name (GST_EVENT_TYPE (item)),
2269         GST_OBJECT_NAME (queue));
2270     gst_event_unref (GST_EVENT_CAST (item));
2271     return;
2272   }
2273 }
2274
2275 /* dequeue an item from the queue and update level stats */
2276 static GstMiniObject *
2277 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2278 {
2279   GstMiniObject *item;
2280
2281   if (!QUEUE_IS_USING_QUEUE (queue)) {
2282     item = gst_queue2_read_item_from_file (queue);
2283   } else {
2284     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2285
2286     if (qitem == NULL)
2287       goto no_item;
2288
2289     item = qitem->item;
2290     g_slice_free (GstQueue2Item, qitem);
2291   }
2292
2293   if (item == NULL)
2294     goto no_item;
2295
2296   if (GST_IS_BUFFER (item)) {
2297     GstBuffer *buffer;
2298     guint size;
2299
2300     buffer = GST_BUFFER_CAST (item);
2301     size = gst_buffer_get_size (buffer);
2302     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2303
2304     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2305         "retrieved buffer %p from queue", buffer);
2306
2307     if (QUEUE_IS_USING_QUEUE (queue)) {
2308       queue->cur_level.buffers--;
2309       queue->cur_level.bytes -= size;
2310     }
2311     queue->bytes_out += size;
2312
2313     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
2314     /* update the byterate stats */
2315     update_out_rates (queue);
2316     /* update the buffering */
2317     if (queue->use_buffering)
2318       update_buffering (queue);
2319
2320   } else if (GST_IS_EVENT (item)) {
2321     GstEvent *event = GST_EVENT_CAST (item);
2322
2323     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2324
2325     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2326         "retrieved event %p from queue", event);
2327
2328     switch (GST_EVENT_TYPE (event)) {
2329       case GST_EVENT_EOS:
2330         /* queue is empty now that we dequeued the EOS */
2331         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2332         break;
2333       case GST_EVENT_SEGMENT:
2334         apply_segment (queue, event, &queue->src_segment, FALSE);
2335         break;
2336       case GST_EVENT_GAP:
2337         apply_gap (queue, event, &queue->src_segment, FALSE);
2338         break;
2339       default:
2340         break;
2341     }
2342   } else if (GST_IS_BUFFER_LIST (item)) {
2343     GstBufferList *buffer_list;
2344     guint size = 0;
2345
2346     buffer_list = GST_BUFFER_LIST_CAST (item);
2347     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2348     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2349
2350     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2351         "retrieved buffer list %p from queue", buffer_list);
2352
2353     if (QUEUE_IS_USING_QUEUE (queue)) {
2354       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2355       queue->cur_level.bytes -= size;
2356     }
2357     queue->bytes_out += size;
2358
2359     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2360     /* update the byterate stats */
2361     update_out_rates (queue);
2362     /* update the buffering */
2363     if (queue->use_buffering)
2364       update_buffering (queue);
2365   } else if (GST_IS_QUERY (item)) {
2366     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2367         "retrieved query %p from queue", item);
2368     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2369   } else {
2370     g_warning
2371         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2372         item, GST_OBJECT_NAME (queue));
2373     item = NULL;
2374     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2375   }
2376   GST_QUEUE2_SIGNAL_DEL (queue);
2377
2378   return item;
2379
2380   /* ERRORS */
2381 no_item:
2382   {
2383     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2384     return NULL;
2385   }
2386 }
2387
2388 static gboolean
2389 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2390     GstEvent * event)
2391 {
2392   gboolean ret = TRUE;
2393   GstQueue2 *queue;
2394
2395   queue = GST_QUEUE2 (parent);
2396
2397   switch (GST_EVENT_TYPE (event)) {
2398     case GST_EVENT_FLUSH_START:
2399     {
2400       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2401       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2402         /* forward event */
2403         ret = gst_pad_push_event (queue->srcpad, event);
2404
2405         /* now unblock the chain function */
2406         GST_QUEUE2_MUTEX_LOCK (queue);
2407         queue->srcresult = GST_FLOW_FLUSHING;
2408         queue->sinkresult = GST_FLOW_FLUSHING;
2409         /* unblock the loop and chain functions */
2410         GST_QUEUE2_SIGNAL_ADD (queue);
2411         GST_QUEUE2_SIGNAL_DEL (queue);
2412         queue->last_query = FALSE;
2413         g_cond_signal (&queue->query_handled);
2414         GST_QUEUE2_MUTEX_UNLOCK (queue);
2415
2416         /* make sure it pauses, this should happen since we sent
2417          * flush_start downstream. */
2418         gst_pad_pause_task (queue->srcpad);
2419         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2420       } else {
2421         GST_QUEUE2_MUTEX_LOCK (queue);
2422         /* flush the sink pad */
2423         queue->sinkresult = GST_FLOW_FLUSHING;
2424         GST_QUEUE2_SIGNAL_DEL (queue);
2425         queue->last_query = FALSE;
2426         g_cond_signal (&queue->query_handled);
2427         GST_QUEUE2_MUTEX_UNLOCK (queue);
2428
2429         gst_event_unref (event);
2430       }
2431       break;
2432     }
2433     case GST_EVENT_FLUSH_STOP:
2434     {
2435       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2436
2437       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2438         /* forward event */
2439         ret = gst_pad_push_event (queue->srcpad, event);
2440
2441         GST_QUEUE2_MUTEX_LOCK (queue);
2442         gst_queue2_locked_flush (queue, FALSE, TRUE);
2443         queue->srcresult = GST_FLOW_OK;
2444         queue->sinkresult = GST_FLOW_OK;
2445         queue->is_eos = FALSE;
2446         queue->unexpected = FALSE;
2447         queue->seeking = FALSE;
2448         /* reset rate counters */
2449         reset_rate_timer (queue);
2450         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2451             queue->srcpad, NULL);
2452         GST_QUEUE2_MUTEX_UNLOCK (queue);
2453       } else {
2454         GST_QUEUE2_MUTEX_LOCK (queue);
2455         queue->segment_event_received = FALSE;
2456         queue->is_eos = FALSE;
2457         queue->unexpected = FALSE;
2458         queue->sinkresult = GST_FLOW_OK;
2459         queue->seeking = FALSE;
2460         GST_QUEUE2_MUTEX_UNLOCK (queue);
2461
2462         gst_event_unref (event);
2463       }
2464       break;
2465     }
2466     default:
2467       if (GST_EVENT_IS_SERIALIZED (event)) {
2468         /* serialized events go in the queue */
2469         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2470         if (queue->srcresult != GST_FLOW_OK) {
2471           /* Errors in sticky event pushing are no problem and ignored here
2472            * as they will cause more meaningful errors during data flow.
2473            * For EOS events, that are not followed by data flow, we still
2474            * return FALSE here though and report an error.
2475            */
2476           if (!GST_EVENT_IS_STICKY (event)) {
2477             goto out_flow_error;
2478           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2479             if (queue->srcresult == GST_FLOW_NOT_LINKED
2480                 || queue->srcresult < GST_FLOW_EOS) {
2481               GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2482                   (_("Internal data flow error.")),
2483                   ("streaming task paused, reason %s (%d)",
2484                       gst_flow_get_name (queue->srcresult), queue->srcresult));
2485             }
2486             goto out_flow_error;
2487           }
2488         }
2489         /* refuse more events on EOS */
2490         if (queue->is_eos)
2491           goto out_eos;
2492         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2493         GST_QUEUE2_MUTEX_UNLOCK (queue);
2494         gst_queue2_post_buffering (queue);
2495       } else {
2496         /* non-serialized events are passed upstream. */
2497         ret = gst_pad_push_event (queue->srcpad, event);
2498       }
2499       break;
2500   }
2501   return ret;
2502
2503   /* ERRORS */
2504 out_flushing:
2505   {
2506     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2507     GST_QUEUE2_MUTEX_UNLOCK (queue);
2508     gst_event_unref (event);
2509     return FALSE;
2510   }
2511 out_eos:
2512   {
2513     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2514     GST_QUEUE2_MUTEX_UNLOCK (queue);
2515     gst_event_unref (event);
2516     return FALSE;
2517   }
2518 out_flow_error:
2519   {
2520     GST_LOG_OBJECT (queue,
2521         "refusing event, we have a downstream flow error: %s",
2522         gst_flow_get_name (queue->srcresult));
2523     GST_QUEUE2_MUTEX_UNLOCK (queue);
2524     gst_event_unref (event);
2525     return FALSE;
2526   }
2527 }
2528
2529 static gboolean
2530 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2531     GstQuery * query)
2532 {
2533   GstQueue2 *queue;
2534   gboolean res;
2535
2536   queue = GST_QUEUE2 (parent);
2537
2538   switch (GST_QUERY_TYPE (query)) {
2539     default:
2540       if (GST_QUERY_IS_SERIALIZED (query)) {
2541         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2542         /* serialized events go in the queue. We need to be certain that we
2543          * don't cause deadlocks waiting for the query return value. We check if
2544          * the queue is empty (nothing is blocking downstream and the query can
2545          * be pushed for sure) or we are not buffering. If we are buffering,
2546          * the pipeline waits to unblock downstream until our queue fills up
2547          * completely, which can not happen if we block on the query..
2548          * Therefore we only potentially block when we are not buffering. */
2549         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2550         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2551                 || !queue->use_buffering)) {
2552           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2553             gst_queue2_locked_enqueue (queue, query,
2554                 GST_QUEUE2_ITEM_TYPE_QUERY);
2555
2556             STATUS (queue, queue->sinkpad, "wait for QUERY");
2557             g_cond_wait (&queue->query_handled, &queue->qlock);
2558             if (queue->sinkresult != GST_FLOW_OK)
2559               goto out_flushing;
2560             res = queue->last_query;
2561           } else {
2562             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2563             res = FALSE;
2564           }
2565         } else {
2566           GST_DEBUG_OBJECT (queue,
2567               "refusing query, we are not using the queue");
2568           res = FALSE;
2569         }
2570         GST_QUEUE2_MUTEX_UNLOCK (queue);
2571         gst_queue2_post_buffering (queue);
2572       } else {
2573         res = gst_pad_query_default (pad, parent, query);
2574       }
2575       break;
2576   }
2577   return res;
2578
2579   /* ERRORS */
2580 out_flushing:
2581   {
2582     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2583     GST_QUEUE2_MUTEX_UNLOCK (queue);
2584     return FALSE;
2585   }
2586 }
2587
2588 static gboolean
2589 gst_queue2_is_empty (GstQueue2 * queue)
2590 {
2591   /* never empty on EOS */
2592   if (queue->is_eos)
2593     return FALSE;
2594
2595   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2596     return queue->current->writing_pos <= queue->current->max_reading_pos;
2597   } else {
2598     if (queue->queue.length == 0)
2599       return TRUE;
2600   }
2601
2602   return FALSE;
2603 }
2604
2605 static gboolean
2606 gst_queue2_is_filled (GstQueue2 * queue)
2607 {
2608   gboolean res;
2609
2610   /* always filled on EOS */
2611   if (queue->is_eos)
2612     return TRUE;
2613
2614 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2615     (queue->cur_level.format) >= ((alt_max) ? \
2616       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2617
2618   /* if using a ring buffer we're filled if all ring buffer space is used
2619    * _by the current range_ */
2620   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2621     guint64 rb_size = queue->ring_buffer_max_size;
2622     GST_DEBUG_OBJECT (queue,
2623         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2624         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2625     return CHECK_FILLED (bytes, rb_size);
2626   }
2627
2628   /* if using file, we're never filled if we don't have EOS */
2629   if (QUEUE_IS_USING_TEMP_FILE (queue))
2630     return FALSE;
2631
2632   /* we are never filled when we have no buffers at all */
2633   if (queue->cur_level.buffers == 0)
2634     return FALSE;
2635
2636   /* we are filled if one of the current levels exceeds the max */
2637   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2638       || CHECK_FILLED (time, 0);
2639
2640   /* if we need to, use the rate estimate to check against the max time we are
2641    * allowed to queue */
2642   if (queue->use_rate_estimate)
2643     res |= CHECK_FILLED (rate_time, 0);
2644
2645 #undef CHECK_FILLED
2646   return res;
2647 }
2648
2649 static GstFlowReturn
2650 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2651     GstMiniObject * item, GstQueue2ItemType item_type)
2652 {
2653   /* we have to lock the queue since we span threads */
2654   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2655   /* when we received EOS, we refuse more data */
2656   if (queue->is_eos)
2657     goto out_eos;
2658   /* when we received unexpected from downstream, refuse more buffers */
2659   if (queue->unexpected)
2660     goto out_unexpected;
2661
2662   /* while we didn't receive the newsegment, we're seeking and we skip data */
2663   if (queue->seeking)
2664     goto out_seeking;
2665
2666   if (!gst_queue2_wait_free_space (queue))
2667     goto out_flushing;
2668
2669   /* put buffer in queue now */
2670   gst_queue2_locked_enqueue (queue, item, item_type);
2671   GST_QUEUE2_MUTEX_UNLOCK (queue);
2672   gst_queue2_post_buffering (queue);
2673
2674   return GST_FLOW_OK;
2675
2676   /* special conditions */
2677 out_flushing:
2678   {
2679     GstFlowReturn ret = queue->sinkresult;
2680
2681     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2682         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2683     GST_QUEUE2_MUTEX_UNLOCK (queue);
2684     gst_mini_object_unref (item);
2685
2686     return ret;
2687   }
2688 out_eos:
2689   {
2690     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2691     GST_QUEUE2_MUTEX_UNLOCK (queue);
2692     gst_mini_object_unref (item);
2693
2694     return GST_FLOW_EOS;
2695   }
2696 out_seeking:
2697   {
2698     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2699     GST_QUEUE2_MUTEX_UNLOCK (queue);
2700     gst_mini_object_unref (item);
2701
2702     return GST_FLOW_OK;
2703   }
2704 out_unexpected:
2705   {
2706     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2707     GST_QUEUE2_MUTEX_UNLOCK (queue);
2708     gst_mini_object_unref (item);
2709
2710     return GST_FLOW_EOS;
2711   }
2712 }
2713
2714 static GstFlowReturn
2715 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2716 {
2717   GstQueue2 *queue;
2718
2719   queue = GST_QUEUE2 (parent);
2720
2721   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2722       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2723       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2724       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2725       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2726
2727   return gst_queue2_chain_buffer_or_buffer_list (queue,
2728       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2729 }
2730
2731 static GstFlowReturn
2732 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2733     GstBufferList * buffer_list)
2734 {
2735   GstQueue2 *queue;
2736
2737   queue = GST_QUEUE2 (parent);
2738
2739   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2740       "received buffer list %p", buffer_list);
2741
2742   return gst_queue2_chain_buffer_or_buffer_list (queue,
2743       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2744 }
2745
2746 static GstMiniObject *
2747 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2748 {
2749   GstMiniObject *data;
2750
2751   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2752
2753   /* stop pushing buffers, we dequeue all items until we see an item that we
2754    * can push again, which is EOS or SEGMENT. If there is nothing in the
2755    * queue we can push, we set a flag to make the sinkpad refuse more
2756    * buffers with an EOS return value until we receive something
2757    * pushable again or we get flushed. */
2758   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2759     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2760       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2761           "dropping EOS buffer %p", data);
2762       gst_buffer_unref (GST_BUFFER_CAST (data));
2763     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2764       GstEvent *event = GST_EVENT_CAST (data);
2765       GstEventType type = GST_EVENT_TYPE (event);
2766
2767       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2768         /* we found a pushable item in the queue, push it out */
2769         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2770             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2771         return data;
2772       }
2773       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2774           "dropping EOS event %p", event);
2775       gst_event_unref (event);
2776     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2777       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2778           "dropping EOS buffer list %p", data);
2779       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2780     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2781       queue->last_query = FALSE;
2782       g_cond_signal (&queue->query_handled);
2783       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2784     }
2785   }
2786   /* no more items in the queue. Set the unexpected flag so that upstream
2787    * make us refuse any more buffers on the sinkpad. Since we will still
2788    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2789    * task function does not shut down. */
2790   queue->unexpected = TRUE;
2791   return NULL;
2792 }
2793
2794 /* dequeue an item from the queue an push it downstream. This functions returns
2795  * the result of the push. */
2796 static GstFlowReturn
2797 gst_queue2_push_one (GstQueue2 * queue)
2798 {
2799   GstFlowReturn result = queue->srcresult;
2800   GstMiniObject *data;
2801   GstQueue2ItemType item_type;
2802
2803   data = gst_queue2_locked_dequeue (queue, &item_type);
2804   if (data == NULL)
2805     goto no_item;
2806
2807 next:
2808   STATUS (queue, queue->srcpad, "We have something dequeud");
2809   g_atomic_int_set (&queue->downstream_may_block,
2810       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2811       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2812   GST_QUEUE2_MUTEX_UNLOCK (queue);
2813   gst_queue2_post_buffering (queue);
2814
2815   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2816     GstBuffer *buffer;
2817
2818     buffer = GST_BUFFER_CAST (data);
2819
2820     result = gst_pad_push (queue->srcpad, buffer);
2821     g_atomic_int_set (&queue->downstream_may_block, 0);
2822
2823     /* need to check for srcresult here as well */
2824     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2825     if (result == GST_FLOW_EOS) {
2826       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2827       if (data != NULL)
2828         goto next;
2829       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2830        * to the caller so that the task function does not shut down */
2831       result = GST_FLOW_OK;
2832     }
2833   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2834     GstEvent *event = GST_EVENT_CAST (data);
2835     GstEventType type = GST_EVENT_TYPE (event);
2836
2837     gst_pad_push_event (queue->srcpad, event);
2838
2839     /* if we're EOS, return EOS so that the task pauses. */
2840     if (type == GST_EVENT_EOS) {
2841       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2842           "pushed EOS event %p, return EOS", event);
2843       result = GST_FLOW_EOS;
2844     }
2845
2846     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2847   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2848     GstBufferList *buffer_list;
2849
2850     buffer_list = GST_BUFFER_LIST_CAST (data);
2851
2852     result = gst_pad_push_list (queue->srcpad, buffer_list);
2853     g_atomic_int_set (&queue->downstream_may_block, 0);
2854
2855     /* need to check for srcresult here as well */
2856     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2857     if (result == GST_FLOW_EOS) {
2858       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2859       if (data != NULL)
2860         goto next;
2861       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2862        * to the caller so that the task function does not shut down */
2863       result = GST_FLOW_OK;
2864     }
2865   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2866     GstQuery *query = GST_QUERY_CAST (data);
2867
2868     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2869     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2870     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2871     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2872         "did query %p, return %d", query, queue->last_query);
2873     g_cond_signal (&queue->query_handled);
2874     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2875     result = GST_FLOW_OK;
2876   }
2877   return result;
2878
2879   /* ERRORS */
2880 no_item:
2881   {
2882     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2883         "exit because we have no item in the queue");
2884     return GST_FLOW_ERROR;
2885   }
2886 out_flushing:
2887   {
2888     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2889     return GST_FLOW_FLUSHING;
2890   }
2891 }
2892
2893 /* called repeatedly with @pad as the source pad. This function should push out
2894  * data to the peer element. */
2895 static void
2896 gst_queue2_loop (GstPad * pad)
2897 {
2898   GstQueue2 *queue;
2899   GstFlowReturn ret;
2900
2901   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2902
2903   /* have to lock for thread-safety */
2904   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2905
2906   if (gst_queue2_is_empty (queue)) {
2907     gboolean started;
2908
2909     /* pause the timer while we wait. The fact that we are waiting does not mean
2910      * the byterate on the output pad is lower */
2911     if ((started = queue->out_timer_started))
2912       g_timer_stop (queue->out_timer);
2913
2914     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2915         "queue is empty, waiting for new data");
2916     do {
2917       /* Wait for data to be available, we could be unlocked because of a flush. */
2918       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2919     }
2920     while (gst_queue2_is_empty (queue));
2921
2922     /* and continue if we were running before */
2923     if (started)
2924       g_timer_continue (queue->out_timer);
2925   }
2926   ret = gst_queue2_push_one (queue);
2927   queue->srcresult = ret;
2928   queue->sinkresult = ret;
2929   if (ret != GST_FLOW_OK)
2930     goto out_flushing;
2931
2932   GST_QUEUE2_MUTEX_UNLOCK (queue);
2933   gst_queue2_post_buffering (queue);
2934
2935   return;
2936
2937   /* ERRORS */
2938 out_flushing:
2939   {
2940     gboolean eos = queue->is_eos;
2941     GstFlowReturn ret = queue->srcresult;
2942
2943     gst_pad_pause_task (queue->srcpad);
2944     if (ret == GST_FLOW_FLUSHING) {
2945       gst_queue2_locked_flush (queue, FALSE, FALSE);
2946     } else {
2947       GST_QUEUE2_SIGNAL_DEL (queue);
2948       queue->last_query = FALSE;
2949       g_cond_signal (&queue->query_handled);
2950     }
2951     GST_QUEUE2_MUTEX_UNLOCK (queue);
2952     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2953         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2954     /* let app know about us giving up if upstream is not expected to do so */
2955     /* EOS is already taken care of elsewhere */
2956     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
2957       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2958           (_("Internal data flow error.")),
2959           ("streaming task paused, reason %s (%d)",
2960               gst_flow_get_name (ret), ret));
2961       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2962     }
2963     return;
2964   }
2965 }
2966
2967 static gboolean
2968 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2969 {
2970   gboolean res = TRUE;
2971   GstQueue2 *queue = GST_QUEUE2 (parent);
2972
2973 #ifndef GST_DISABLE_GST_DEBUG
2974   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2975       event, GST_EVENT_TYPE_NAME (event));
2976 #endif
2977
2978   switch (GST_EVENT_TYPE (event)) {
2979     case GST_EVENT_FLUSH_START:
2980       if (QUEUE_IS_USING_QUEUE (queue)) {
2981         /* just forward upstream */
2982         res = gst_pad_push_event (queue->sinkpad, event);
2983       } else {
2984         /* now unblock the getrange function */
2985         GST_QUEUE2_MUTEX_LOCK (queue);
2986         GST_DEBUG_OBJECT (queue, "flushing");
2987         queue->srcresult = GST_FLOW_FLUSHING;
2988         GST_QUEUE2_SIGNAL_ADD (queue);
2989         GST_QUEUE2_MUTEX_UNLOCK (queue);
2990
2991         /* when using a temp file, we eat the event */
2992         res = TRUE;
2993         gst_event_unref (event);
2994       }
2995       break;
2996     case GST_EVENT_FLUSH_STOP:
2997       if (QUEUE_IS_USING_QUEUE (queue)) {
2998         /* just forward upstream */
2999         res = gst_pad_push_event (queue->sinkpad, event);
3000       } else {
3001         /* now unblock the getrange function */
3002         GST_QUEUE2_MUTEX_LOCK (queue);
3003         queue->srcresult = GST_FLOW_OK;
3004         GST_QUEUE2_MUTEX_UNLOCK (queue);
3005
3006         /* when using a temp file, we eat the event */
3007         res = TRUE;
3008         gst_event_unref (event);
3009       }
3010       break;
3011     case GST_EVENT_RECONFIGURE:
3012       GST_QUEUE2_MUTEX_LOCK (queue);
3013       /* assume downstream is linked now and try to push again */
3014       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3015         queue->srcresult = GST_FLOW_OK;
3016         queue->sinkresult = GST_FLOW_OK;
3017         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3018           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3019               NULL);
3020         }
3021       }
3022       GST_QUEUE2_MUTEX_UNLOCK (queue);
3023
3024       res = gst_pad_push_event (queue->sinkpad, event);
3025       break;
3026     default:
3027       res = gst_pad_push_event (queue->sinkpad, event);
3028       break;
3029   }
3030
3031   return res;
3032 }
3033
3034 static gboolean
3035 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3036 {
3037   GstQueue2 *queue;
3038
3039   queue = GST_QUEUE2 (parent);
3040
3041   switch (GST_QUERY_TYPE (query)) {
3042     case GST_QUERY_POSITION:
3043     {
3044       gint64 peer_pos;
3045       GstFormat format;
3046
3047       if (!gst_pad_peer_query (queue->sinkpad, query))
3048         goto peer_failed;
3049
3050       /* get peer position */
3051       gst_query_parse_position (query, &format, &peer_pos);
3052
3053       /* FIXME: this code assumes that there's no discont in the queue */
3054       switch (format) {
3055         case GST_FORMAT_BYTES:
3056           peer_pos -= queue->cur_level.bytes;
3057           break;
3058         case GST_FORMAT_TIME:
3059           peer_pos -= queue->cur_level.time;
3060           break;
3061         default:
3062           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3063               "know how to adjust value", gst_format_get_name (format));
3064           return FALSE;
3065       }
3066       /* set updated position */
3067       gst_query_set_position (query, format, peer_pos);
3068       break;
3069     }
3070     case GST_QUERY_DURATION:
3071     {
3072       GST_DEBUG_OBJECT (queue, "doing peer query");
3073
3074       if (!gst_pad_peer_query (queue->sinkpad, query))
3075         goto peer_failed;
3076
3077       GST_DEBUG_OBJECT (queue, "peer query success");
3078       break;
3079     }
3080     case GST_QUERY_BUFFERING:
3081     {
3082       gint percent;
3083       gboolean is_buffering;
3084       GstBufferingMode mode;
3085       gint avg_in, avg_out;
3086       gint64 buffering_left;
3087
3088       GST_DEBUG_OBJECT (queue, "query buffering");
3089
3090       get_buffering_percent (queue, &is_buffering, &percent);
3091       gst_query_set_buffering_percent (query, is_buffering, percent);
3092
3093       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3094           &buffering_left);
3095       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3096           buffering_left);
3097
3098       if (!QUEUE_IS_USING_QUEUE (queue)) {
3099         /* add ranges for download and ringbuffer buffering */
3100         GstFormat format;
3101         gint64 start, stop, range_start, range_stop;
3102         guint64 writing_pos;
3103         gint64 estimated_total;
3104         gint64 duration;
3105         gboolean peer_res, is_eos;
3106         GstQueue2Range *queued_ranges;
3107
3108         /* we need a current download region */
3109         if (queue->current == NULL)
3110           return FALSE;
3111
3112         writing_pos = queue->current->writing_pos;
3113         is_eos = queue->is_eos;
3114
3115         if (is_eos) {
3116           /* we're EOS, we know the duration in bytes now */
3117           peer_res = TRUE;
3118           duration = writing_pos;
3119         } else {
3120           /* get duration of upstream in bytes */
3121           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3122               GST_FORMAT_BYTES, &duration);
3123         }
3124
3125         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3126             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3127
3128         /* calculate remaining and total download time */
3129         if (peer_res && avg_in > 0.0)
3130           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3131         else
3132           estimated_total = -1;
3133
3134         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3135             estimated_total);
3136
3137         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3138
3139         switch (format) {
3140           case GST_FORMAT_PERCENT:
3141             /* we need duration */
3142             if (!peer_res)
3143               goto peer_failed;
3144
3145             start = 0;
3146             /* get our available data relative to the duration */
3147             if (duration != -1)
3148               stop =
3149                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3150                   duration);
3151             else
3152               stop = -1;
3153             break;
3154           case GST_FORMAT_BYTES:
3155             start = 0;
3156             stop = writing_pos;
3157             break;
3158           default:
3159             start = -1;
3160             stop = -1;
3161             break;
3162         }
3163
3164         /* fill out the buffered ranges */
3165         for (queued_ranges = queue->ranges; queued_ranges;
3166             queued_ranges = queued_ranges->next) {
3167           switch (format) {
3168             case GST_FORMAT_PERCENT:
3169               if (duration == -1) {
3170                 range_start = 0;
3171                 range_stop = 0;
3172                 break;
3173               }
3174 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3175               range_start =
3176                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3177                   queued_ranges->reading_pos, duration);
3178 #else
3179               range_start =
3180                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3181                   queued_ranges->offset, duration);
3182 #endif
3183               range_stop =
3184                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3185                   queued_ranges->writing_pos, duration);
3186               break;
3187             case GST_FORMAT_BYTES:
3188 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3189               range_start = queued_ranges->reading_pos;
3190 #else
3191               range_start = queued_ranges->offset;
3192 #endif
3193               range_stop = queued_ranges->writing_pos;
3194               break;
3195             default:
3196               range_start = -1;
3197               range_stop = -1;
3198               break;
3199           }
3200           if (range_start == range_stop)
3201             continue;
3202           GST_DEBUG_OBJECT (queue,
3203               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3204               G_GINT64_FORMAT, range_start, range_stop);
3205           gst_query_add_buffering_range (query, range_start, range_stop);
3206         }
3207
3208         gst_query_set_buffering_range (query, format, start, stop,
3209             estimated_total);
3210       }
3211       break;
3212     }
3213     case GST_QUERY_SCHEDULING:
3214     {
3215       gboolean pull_mode;
3216       GstSchedulingFlags flags = 0;
3217 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3218       if ((!QUEUE_IS_USING_QUEUE (queue)) && !gst_pad_peer_query (queue->sinkpad, query))
3219 #else
3220       if (!gst_pad_peer_query (queue->sinkpad, query))
3221 #endif
3222         goto peer_failed;
3223
3224       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3225
3226       /* we can operate in pull mode when we are using a tempfile */
3227       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3228
3229       if (pull_mode)
3230         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3231       gst_query_set_scheduling (query, flags, 0, -1, 0);
3232       if (pull_mode)
3233         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3234       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3235       break;
3236     }
3237     default:
3238       /* peer handled other queries */
3239       if (!gst_pad_query_default (pad, parent, query))
3240         goto peer_failed;
3241       break;
3242   }
3243
3244   return TRUE;
3245
3246   /* ERRORS */
3247 peer_failed:
3248   {
3249     GST_DEBUG_OBJECT (queue, "failed peer query");
3250     return FALSE;
3251   }
3252 }
3253
3254 static gboolean
3255 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3256 {
3257   GstQueue2 *queue = GST_QUEUE2 (element);
3258
3259   /* simply forward to the srcpad query function */
3260   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3261       query);
3262 }
3263
3264 static void
3265 gst_queue2_update_upstream_size (GstQueue2 * queue)
3266 {
3267   gint64 upstream_size = -1;
3268
3269   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3270           &upstream_size)) {
3271     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3272
3273     /* upstream_size can be negative but queue->upstream_size is unsigned.
3274      * Prevent setting negative values to it (the query can return -1) */
3275     if (upstream_size >= 0)
3276       queue->upstream_size = upstream_size;
3277     else
3278       queue->upstream_size = 0;
3279   }
3280 }
3281
3282 static GstFlowReturn
3283 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3284     guint length, GstBuffer ** buffer)
3285 {
3286   GstQueue2 *queue;
3287   GstFlowReturn ret;
3288
3289   queue = GST_QUEUE2_CAST (parent);
3290
3291   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3292   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3293   offset = (offset == -1) ? queue->current->reading_pos : offset;
3294
3295   GST_DEBUG_OBJECT (queue,
3296       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3297
3298   /* catch any reads beyond the size of the file here to make sure queue2
3299    * doesn't send seek events beyond the size of the file upstream, since
3300    * that would confuse elements such as souphttpsrc and/or http servers.
3301    * Demuxers often just loop until EOS at the end of the file to figure out
3302    * when they've read all the end-headers or index chunks. */
3303   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3304     gst_queue2_update_upstream_size (queue);
3305     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3306       goto out_unexpected;
3307   }
3308
3309   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3310     gst_queue2_update_upstream_size (queue);
3311     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3312       length = queue->upstream_size - offset;
3313       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3314     }
3315   }
3316
3317   /* FIXME - function will block when the range is not yet available */
3318   ret = gst_queue2_create_read (queue, offset, length, buffer);
3319   GST_QUEUE2_MUTEX_UNLOCK (queue);
3320   gst_queue2_post_buffering (queue);
3321
3322   return ret;
3323
3324   /* ERRORS */
3325 out_flushing:
3326   {
3327     ret = queue->srcresult;
3328
3329     GST_DEBUG_OBJECT (queue, "we are flushing");
3330     GST_QUEUE2_MUTEX_UNLOCK (queue);
3331     return ret;
3332   }
3333 out_unexpected:
3334   {
3335     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3336     GST_QUEUE2_MUTEX_UNLOCK (queue);
3337     return GST_FLOW_EOS;
3338   }
3339 }
3340
3341 /* sink currently only operates in push mode */
3342 static gboolean
3343 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3344     GstPadMode mode, gboolean active)
3345 {
3346   gboolean result;
3347   GstQueue2 *queue;
3348
3349   queue = GST_QUEUE2 (parent);
3350
3351   switch (mode) {
3352     case GST_PAD_MODE_PUSH:
3353       if (active) {
3354         GST_QUEUE2_MUTEX_LOCK (queue);
3355         GST_DEBUG_OBJECT (queue, "activating push mode");
3356         queue->srcresult = GST_FLOW_OK;
3357         queue->sinkresult = GST_FLOW_OK;
3358         queue->is_eos = FALSE;
3359         queue->unexpected = FALSE;
3360         reset_rate_timer (queue);
3361         GST_QUEUE2_MUTEX_UNLOCK (queue);
3362       } else {
3363         /* unblock chain function */
3364         GST_QUEUE2_MUTEX_LOCK (queue);
3365         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3366         queue->srcresult = GST_FLOW_FLUSHING;
3367         queue->sinkresult = GST_FLOW_FLUSHING;
3368         GST_QUEUE2_SIGNAL_DEL (queue);
3369         /* Unblock query handler */
3370         queue->last_query = FALSE;
3371         g_cond_signal (&queue->query_handled);
3372         GST_QUEUE2_MUTEX_UNLOCK (queue);
3373
3374         /* wait until it is unblocked and clean up */
3375         GST_PAD_STREAM_LOCK (pad);
3376         GST_QUEUE2_MUTEX_LOCK (queue);
3377         gst_queue2_locked_flush (queue, TRUE, FALSE);
3378         GST_QUEUE2_MUTEX_UNLOCK (queue);
3379         GST_PAD_STREAM_UNLOCK (pad);
3380       }
3381       result = TRUE;
3382       break;
3383     default:
3384       result = FALSE;
3385       break;
3386   }
3387   return result;
3388 }
3389
3390 /* src operating in push mode, we start a task on the source pad that pushes out
3391  * buffers from the queue */
3392 static gboolean
3393 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3394 {
3395   gboolean result = FALSE;
3396   GstQueue2 *queue;
3397
3398   queue = GST_QUEUE2 (parent);
3399
3400   if (active) {
3401     GST_QUEUE2_MUTEX_LOCK (queue);
3402     GST_DEBUG_OBJECT (queue, "activating push mode");
3403     queue->srcresult = GST_FLOW_OK;
3404     queue->sinkresult = GST_FLOW_OK;
3405     queue->is_eos = FALSE;
3406     queue->unexpected = FALSE;
3407     result =
3408         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3409     GST_QUEUE2_MUTEX_UNLOCK (queue);
3410   } else {
3411     /* unblock loop function */
3412     GST_QUEUE2_MUTEX_LOCK (queue);
3413     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3414     queue->srcresult = GST_FLOW_FLUSHING;
3415     queue->sinkresult = GST_FLOW_FLUSHING;
3416     /* the item add signal will unblock */
3417     GST_QUEUE2_SIGNAL_ADD (queue);
3418     GST_QUEUE2_MUTEX_UNLOCK (queue);
3419
3420     /* step 2, make sure streaming finishes */
3421     result = gst_pad_stop_task (pad);
3422   }
3423
3424   return result;
3425 }
3426
3427 /* pull mode, downstream will call our getrange function */
3428 static gboolean
3429 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3430 {
3431   gboolean result;
3432   GstQueue2 *queue;
3433
3434   queue = GST_QUEUE2 (parent);
3435
3436   if (active) {
3437     GST_QUEUE2_MUTEX_LOCK (queue);
3438     if (!QUEUE_IS_USING_QUEUE (queue)) {
3439       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3440         /* open the temp file now */
3441         result = gst_queue2_open_temp_location_file (queue);
3442       } else if (!queue->ring_buffer) {
3443         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3444         result = ! !queue->ring_buffer;
3445       } else {
3446         result = TRUE;
3447       }
3448
3449       GST_DEBUG_OBJECT (queue, "activating pull mode");
3450       init_ranges (queue);
3451       queue->srcresult = GST_FLOW_OK;
3452       queue->sinkresult = GST_FLOW_OK;
3453       queue->is_eos = FALSE;
3454       queue->unexpected = FALSE;
3455       queue->upstream_size = 0;
3456     } else {
3457       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3458       /* this is not allowed, we cannot operate in pull mode without a temp
3459        * file. */
3460       queue->srcresult = GST_FLOW_FLUSHING;
3461       queue->sinkresult = GST_FLOW_FLUSHING;
3462       result = FALSE;
3463     }
3464     GST_QUEUE2_MUTEX_UNLOCK (queue);
3465   } else {
3466     GST_QUEUE2_MUTEX_LOCK (queue);
3467     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3468     queue->srcresult = GST_FLOW_FLUSHING;
3469     queue->sinkresult = GST_FLOW_FLUSHING;
3470     /* this will unlock getrange */
3471     GST_QUEUE2_SIGNAL_ADD (queue);
3472     result = TRUE;
3473     GST_QUEUE2_MUTEX_UNLOCK (queue);
3474   }
3475
3476   return result;
3477 }
3478
3479 static gboolean
3480 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3481     gboolean active)
3482 {
3483   gboolean res;
3484
3485   switch (mode) {
3486     case GST_PAD_MODE_PULL:
3487       res = gst_queue2_src_activate_pull (pad, parent, active);
3488       break;
3489     case GST_PAD_MODE_PUSH:
3490       res = gst_queue2_src_activate_push (pad, parent, active);
3491       break;
3492     default:
3493       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3494       res = FALSE;
3495       break;
3496   }
3497   return res;
3498 }
3499
3500 static GstStateChangeReturn
3501 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3502 {
3503   GstQueue2 *queue;
3504   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3505
3506   queue = GST_QUEUE2 (element);
3507
3508   switch (transition) {
3509     case GST_STATE_CHANGE_NULL_TO_READY:
3510       break;
3511     case GST_STATE_CHANGE_READY_TO_PAUSED:
3512       GST_QUEUE2_MUTEX_LOCK (queue);
3513       if (!QUEUE_IS_USING_QUEUE (queue)) {
3514         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3515           if (!gst_queue2_open_temp_location_file (queue))
3516             ret = GST_STATE_CHANGE_FAILURE;
3517         } else {
3518           if (queue->ring_buffer) {
3519             g_free (queue->ring_buffer);
3520             queue->ring_buffer = NULL;
3521           }
3522           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3523             ret = GST_STATE_CHANGE_FAILURE;
3524         }
3525         init_ranges (queue);
3526       }
3527       queue->segment_event_received = FALSE;
3528       queue->starting_segment = NULL;
3529       gst_event_replace (&queue->stream_start_event, NULL);
3530       GST_QUEUE2_MUTEX_UNLOCK (queue);
3531       break;
3532     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3533       break;
3534     default:
3535       break;
3536   }
3537
3538   if (ret == GST_STATE_CHANGE_FAILURE)
3539     return ret;
3540
3541   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3542
3543   if (ret == GST_STATE_CHANGE_FAILURE)
3544     return ret;
3545
3546   switch (transition) {
3547     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3548       break;
3549     case GST_STATE_CHANGE_PAUSED_TO_READY:
3550       GST_QUEUE2_MUTEX_LOCK (queue);
3551       if (!QUEUE_IS_USING_QUEUE (queue)) {
3552         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3553           gst_queue2_close_temp_location_file (queue);
3554         } else if (queue->ring_buffer) {
3555           g_free (queue->ring_buffer);
3556           queue->ring_buffer = NULL;
3557         }
3558         clean_ranges (queue);
3559       }
3560       if (queue->starting_segment != NULL) {
3561         gst_event_unref (queue->starting_segment);
3562         queue->starting_segment = NULL;
3563       }
3564       gst_event_replace (&queue->stream_start_event, NULL);
3565       GST_QUEUE2_MUTEX_UNLOCK (queue);
3566       break;
3567     case GST_STATE_CHANGE_READY_TO_NULL:
3568       break;
3569     default:
3570       break;
3571   }
3572
3573   return ret;
3574 }
3575
3576 /* changing the capacity of the queue must wake up
3577  * the _chain function, it might have more room now
3578  * to store the buffer/event in the queue */
3579 #define QUEUE_CAPACITY_CHANGE(q) \
3580   GST_QUEUE2_SIGNAL_DEL (queue); \
3581   if (queue->use_buffering)      \
3582     update_buffering (queue);
3583
3584 /* Changing the minimum required fill level must
3585  * wake up the _loop function as it might now
3586  * be able to preceed.
3587  */
3588 #define QUEUE_THRESHOLD_CHANGE(q)\
3589   GST_QUEUE2_SIGNAL_ADD (queue);
3590
3591 static void
3592 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3593 {
3594   GstState state;
3595
3596   /* the element must be stopped in order to do this */
3597   GST_OBJECT_LOCK (queue);
3598   state = GST_STATE (queue);
3599   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3600     goto wrong_state;
3601   GST_OBJECT_UNLOCK (queue);
3602
3603   /* set new location */
3604   g_free (queue->temp_template);
3605   queue->temp_template = g_strdup (template);
3606
3607   return;
3608
3609 /* ERROR */
3610 wrong_state:
3611   {
3612     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3613     GST_OBJECT_UNLOCK (queue);
3614   }
3615 }
3616
3617 static void
3618 gst_queue2_set_property (GObject * object,
3619     guint prop_id, const GValue * value, GParamSpec * pspec)
3620 {
3621   GstQueue2 *queue = GST_QUEUE2 (object);
3622
3623   /* someone could change levels here, and since this
3624    * affects the get/put funcs, we need to lock for safety. */
3625   GST_QUEUE2_MUTEX_LOCK (queue);
3626
3627   switch (prop_id) {
3628     case PROP_MAX_SIZE_BYTES:
3629       queue->max_level.bytes = g_value_get_uint (value);
3630       QUEUE_CAPACITY_CHANGE (queue);
3631       break;
3632     case PROP_MAX_SIZE_BUFFERS:
3633       queue->max_level.buffers = g_value_get_uint (value);
3634       QUEUE_CAPACITY_CHANGE (queue);
3635       break;
3636     case PROP_MAX_SIZE_TIME:
3637       queue->max_level.time = g_value_get_uint64 (value);
3638       /* set rate_time to the same value. We use an extra field in the level
3639        * structure so that we can easily access and compare it */
3640       queue->max_level.rate_time = queue->max_level.time;
3641       QUEUE_CAPACITY_CHANGE (queue);
3642       break;
3643     case PROP_USE_BUFFERING:
3644       queue->use_buffering = g_value_get_boolean (value);
3645       if (!queue->use_buffering && queue->is_buffering) {
3646         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3647             "posting 100%% message");
3648         SET_PERCENT (queue, 100);
3649         queue->is_buffering = FALSE;
3650       }
3651
3652       if (queue->use_buffering) {
3653         queue->is_buffering = TRUE;
3654         update_buffering (queue);
3655       }
3656       break;
3657     case PROP_USE_RATE_ESTIMATE:
3658       queue->use_rate_estimate = g_value_get_boolean (value);
3659       break;
3660     case PROP_LOW_PERCENT:
3661       queue->low_percent = g_value_get_int (value);
3662       break;
3663     case PROP_HIGH_PERCENT:
3664       queue->high_percent = g_value_get_int (value);
3665       break;
3666     case PROP_TEMP_TEMPLATE:
3667       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3668       break;
3669     case PROP_TEMP_REMOVE:
3670       queue->temp_remove = g_value_get_boolean (value);
3671       break;
3672     case PROP_RING_BUFFER_MAX_SIZE:
3673       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3674       break;
3675     default:
3676       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3677       break;
3678   }
3679
3680   GST_QUEUE2_MUTEX_UNLOCK (queue);
3681   gst_queue2_post_buffering (queue);
3682 }
3683
3684 static void
3685 gst_queue2_get_property (GObject * object,
3686     guint prop_id, GValue * value, GParamSpec * pspec)
3687 {
3688   GstQueue2 *queue = GST_QUEUE2 (object);
3689
3690   GST_QUEUE2_MUTEX_LOCK (queue);
3691
3692   switch (prop_id) {
3693     case PROP_CUR_LEVEL_BYTES:
3694       g_value_set_uint (value, queue->cur_level.bytes);
3695       break;
3696     case PROP_CUR_LEVEL_BUFFERS:
3697       g_value_set_uint (value, queue->cur_level.buffers);
3698       break;
3699     case PROP_CUR_LEVEL_TIME:
3700       g_value_set_uint64 (value, queue->cur_level.time);
3701       break;
3702     case PROP_MAX_SIZE_BYTES:
3703       g_value_set_uint (value, queue->max_level.bytes);
3704       break;
3705     case PROP_MAX_SIZE_BUFFERS:
3706       g_value_set_uint (value, queue->max_level.buffers);
3707       break;
3708     case PROP_MAX_SIZE_TIME:
3709       g_value_set_uint64 (value, queue->max_level.time);
3710       break;
3711     case PROP_USE_BUFFERING:
3712       g_value_set_boolean (value, queue->use_buffering);
3713       break;
3714     case PROP_USE_RATE_ESTIMATE:
3715       g_value_set_boolean (value, queue->use_rate_estimate);
3716       break;
3717     case PROP_LOW_PERCENT:
3718       g_value_set_int (value, queue->low_percent);
3719       break;
3720     case PROP_HIGH_PERCENT:
3721       g_value_set_int (value, queue->high_percent);
3722       break;
3723     case PROP_TEMP_TEMPLATE:
3724       g_value_set_string (value, queue->temp_template);
3725       break;
3726     case PROP_TEMP_LOCATION:
3727       g_value_set_string (value, queue->temp_location);
3728       break;
3729     case PROP_TEMP_REMOVE:
3730       g_value_set_boolean (value, queue->temp_remove);
3731       break;
3732     case PROP_RING_BUFFER_MAX_SIZE:
3733       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3734       break;
3735     default:
3736       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3737       break;
3738   }
3739
3740   GST_QUEUE2_MUTEX_UNLOCK (queue);
3741 }