Merge branch 'upstream/1.16' into tizen_gst_1.16.2
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  * @title: queue2
29  *
30  * Data is queued until one of the limits specified by the
31  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
32  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
33  * more buffers into the queue will block the pushing thread until more space
34  * becomes available.
35  *
36  * The queue will create a new thread on the source pad to decouple the
37  * processing on sink and source pad.
38  *
39  * You can query how many buffers are queued by reading the
40  * #GstQueue2:current-level-buffers property.
41  *
42  * The default queue size limits are 100 buffers, 2MB of data, or
43  * two seconds worth of data, whichever is reached first.
44  *
45  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
46  * will allocate a random free filename and buffer data in the file.
47  * By using this, it will buffer the entire stream data on the file independently
48  * of the queue size limits, they will only be used for buffering statistics.
49  *
50  * The temp-location property will be used to notify the application of the
51  * allocated filename.
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include "gstqueue2.h"
59
60 #include <glib/gstdio.h>
61
62 #include "gst/gst-i18n-lib.h"
63 #include "gst/glib-compat-private.h"
64
65 #include <string.h>
66
67 #ifdef G_OS_WIN32
68 #include <io.h>                 /* lseek, open, close, read */
69 #undef lseek
70 #define lseek _lseeki64
71 #undef off_t
72 #define off_t guint64
73 #else
74 #include <unistd.h>
75 #endif
76
77 #ifdef __BIONIC__               /* Android */
78 #include <fcntl.h>
79 #endif
80
81 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
82     GST_PAD_SINK,
83     GST_PAD_ALWAYS,
84     GST_STATIC_CAPS_ANY);
85
86 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
87     GST_PAD_SRC,
88     GST_PAD_ALWAYS,
89     GST_STATIC_CAPS_ANY);
90
91 GST_DEBUG_CATEGORY_STATIC (queue_debug);
92 #define GST_CAT_DEFAULT (queue_debug)
93 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
94
95 enum
96 {
97   LAST_SIGNAL
98 };
99
100 /* other defines */
101 #define DEFAULT_BUFFER_SIZE 4096
102 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
103 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
104 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
105
106 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
107
108 /* default property values */
109 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
110 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
111 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
112 #define DEFAULT_USE_BUFFERING      FALSE
113 #define DEFAULT_USE_TAGS_BITRATE   FALSE
114 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
115 #define DEFAULT_LOW_PERCENT        10
116 #define DEFAULT_HIGH_PERCENT       99
117 #define DEFAULT_LOW_WATERMARK      0.01
118 #define DEFAULT_HIGH_WATERMARK     0.99
119 #define DEFAULT_TEMP_REMOVE        TRUE
120 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
121 #define DEFAULT_USE_BITRATE_QUERY  TRUE
122
123 enum
124 {
125   PROP_0,
126   PROP_CUR_LEVEL_BUFFERS,
127   PROP_CUR_LEVEL_BYTES,
128   PROP_CUR_LEVEL_TIME,
129   PROP_MAX_SIZE_BUFFERS,
130   PROP_MAX_SIZE_BYTES,
131   PROP_MAX_SIZE_TIME,
132   PROP_USE_BUFFERING,
133   PROP_USE_TAGS_BITRATE,
134   PROP_USE_RATE_ESTIMATE,
135   PROP_LOW_PERCENT,
136   PROP_HIGH_PERCENT,
137   PROP_LOW_WATERMARK,
138   PROP_HIGH_WATERMARK,
139   PROP_TEMP_TEMPLATE,
140   PROP_TEMP_LOCATION,
141   PROP_TEMP_REMOVE,
142   PROP_RING_BUFFER_MAX_SIZE,
143   PROP_AVG_IN_RATE,
144   PROP_USE_BITRATE_QUERY,
145   PROP_BITRATE,
146 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
147   PROP_BUFFER_MODE,
148 #endif
149   PROP_LAST
150 };
151
152 /* Explanation for buffer levels and percentages:
153  *
154  * The buffering_level functions here return a value in a normalized range
155  * that specifies the queue's current fill level. The range goes from 0 to
156  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
157  *
158  * This is not to be confused with the buffering_percent value, which is
159  * a *relative* quantity - relative to the low/high watermarks.
160  * buffering_percent = 0% means buffering_level is at the low watermark.
161  * buffering_percent = 100% means buffering_level is at the high watermark.
162  * buffering_percent is used for determining if the fill level has reached
163  * the high watermark, and for producing BUFFERING messages. This value
164  * always uses a 0..100 range (since it is a percentage).
165  *
166  * To avoid future confusions, whenever "buffering level" is mentioned, it
167  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
168  * range. Whenever "buffering_percent" is mentioned, it refers to the
169  * percentage value that is relative to the low/high watermark. */
170
171 /* Using a buffering level range of 0..1000000 to allow for a
172  * resolution in ppm (1 ppm = 0.0001%) */
173 #define MAX_BUFFERING_LEVEL 1000000
174
175 /* How much 1% makes up in the buffer level range */
176 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
177
178 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
179   l.buffers = 0;                                        \
180   l.bytes = 0;                                          \
181   l.time = 0;                                           \
182   l.rate_time = 0;                                      \
183 } G_STMT_END
184
185 #define STATUS(queue, pad, msg) \
186   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
187                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
188                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
189                       " ns, %"G_GUINT64_FORMAT" items", \
190                       GST_DEBUG_PAD_NAME (pad), \
191                       queue->cur_level.buffers, \
192                       queue->max_level.buffers, \
193                       queue->cur_level.bytes, \
194                       queue->max_level.bytes, \
195                       queue->cur_level.time, \
196                       queue->max_level.time, \
197                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
198                         queue->current->writing_pos - queue->current->max_reading_pos : \
199                         gst_queue_array_get_length(queue->queue)))
200
201 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
202   g_mutex_lock (&q->qlock);                                              \
203 } G_STMT_END
204
205 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
206   GST_QUEUE2_MUTEX_LOCK (q);                                            \
207   if (res != GST_FLOW_OK)                                               \
208     goto label;                                                         \
209 } G_STMT_END
210
211 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
212   g_mutex_unlock (&q->qlock);                                            \
213 } G_STMT_END
214
215 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
216   STATUS (queue, q->sinkpad, "wait for DEL");                           \
217   q->waiting_del = TRUE;                                                \
218   g_cond_wait (&q->item_del, &queue->qlock);                              \
219   q->waiting_del = FALSE;                                               \
220   if (res != GST_FLOW_OK) {                                             \
221     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
222     goto label;                                                         \
223   }                                                                     \
224   STATUS (queue, q->sinkpad, "received DEL");                           \
225 } G_STMT_END
226
227 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
228   STATUS (queue, q->srcpad, "wait for ADD");                            \
229   q->waiting_add = TRUE;                                                \
230   g_cond_wait (&q->item_add, &q->qlock);                                  \
231   q->waiting_add = FALSE;                                               \
232   if (res != GST_FLOW_OK) {                                             \
233     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
234     goto label;                                                         \
235   }                                                                     \
236   STATUS (queue, q->srcpad, "received ADD");                            \
237 } G_STMT_END
238
239 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
240   if (q->waiting_del) {                                                 \
241     STATUS (q, q->srcpad, "signal DEL");                                \
242     g_cond_signal (&q->item_del);                                        \
243   }                                                                     \
244 } G_STMT_END
245
246 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
247   if (q->waiting_add) {                                                 \
248     STATUS (q, q->sinkpad, "signal ADD");                               \
249     g_cond_signal (&q->item_add);                                        \
250   }                                                                     \
251 } G_STMT_END
252
253 #define SET_PERCENT(q, perc) G_STMT_START {                              \
254   if (perc != q->buffering_percent) {                                    \
255     q->buffering_percent = perc;                                         \
256     q->percent_changed = TRUE;                                           \
257     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
258     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
259         &q->buffering_left);                                             \
260   }                                                                      \
261 } G_STMT_END
262
263 #define _do_init \
264     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
265     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
266         "dataflow inside the queue element");
267 #define gst_queue2_parent_class parent_class
268 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
269
270 static void gst_queue2_finalize (GObject * object);
271
272 static void gst_queue2_set_property (GObject * object,
273     guint prop_id, const GValue * value, GParamSpec * pspec);
274 static void gst_queue2_get_property (GObject * object,
275     guint prop_id, GValue * value, GParamSpec * pspec);
276
277 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
278     GstBuffer * buffer);
279 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
280     GstBufferList * buffer_list);
281 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
282 static void gst_queue2_loop (GstPad * pad);
283
284 static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad,
285     GstObject * parent, GstEvent * event);
286 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
287     GstQuery * query);
288
289 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
290     GstEvent * event);
291 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
292     GstQuery * query);
293 static gboolean gst_queue2_handle_query (GstElement * element,
294     GstQuery * query);
295
296 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
297     guint64 offset, guint length, GstBuffer ** buffer);
298
299 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
300     GstPadMode mode, gboolean active);
301 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
302     GstPadMode mode, gboolean active);
303 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
304     GstStateChange transition);
305
306 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
307 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
308
309 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
310 static void update_in_rates (GstQueue2 * queue, gboolean force);
311 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
312 static void gst_queue2_post_buffering (GstQueue2 * queue);
313 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
314 static gboolean change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length);
315 #endif
316
317 typedef enum
318 {
319   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
320   GST_QUEUE2_ITEM_TYPE_BUFFER,
321   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
322   GST_QUEUE2_ITEM_TYPE_EVENT,
323   GST_QUEUE2_ITEM_TYPE_QUERY
324 } GstQueue2ItemType;
325
326 typedef struct
327 {
328   GstQueue2ItemType type;
329   GstMiniObject *item;
330 } GstQueue2Item;
331
332 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
333
334 static void
335 gst_queue2_class_init (GstQueue2Class * klass)
336 {
337   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
338   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
339
340   gobject_class->set_property = gst_queue2_set_property;
341   gobject_class->get_property = gst_queue2_get_property;
342
343   /* properties */
344   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
345       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
346           "Current amount of data in the queue (bytes)",
347           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
348   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
349       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
350           "Current number of buffers in the queue",
351           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
352   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
353       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
354           "Current amount of data in the queue (in ns)",
355           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
356
357   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
358       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
359           "Max. amount of data in the queue (bytes, 0=disable)",
360           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
361           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
362           G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
364       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
365           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
366           DEFAULT_MAX_SIZE_BUFFERS,
367           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
368           G_PARAM_STATIC_STRINGS));
369   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
370       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
371           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
372           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
373           G_PARAM_STATIC_STRINGS));
374
375   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
376       g_param_spec_boolean ("use-buffering", "Use buffering",
377           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
378           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
379           G_PARAM_STATIC_STRINGS));
380   g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
381       g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
382           "Use a bitrate from upstream tags to estimate buffer duration if not provided",
383           DEFAULT_USE_TAGS_BITRATE,
384           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
385           G_PARAM_STATIC_STRINGS));
386   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
387       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
388           "Estimate the bitrate of the stream to calculate time level",
389           DEFAULT_USE_RATE_ESTIMATE,
390           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
391   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
392       g_param_spec_int ("low-percent", "Low percent",
393           "Low threshold for buffering to start. Only used if use-buffering is True "
394           "(Deprecated: use low-watermark instead)",
395           0, 100, DEFAULT_LOW_WATERMARK * 100,
396           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
397   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
398       g_param_spec_int ("high-percent", "High percent",
399           "High threshold for buffering to finish. Only used if use-buffering is True "
400           "(Deprecated: use high-watermark instead)",
401           0, 100, DEFAULT_HIGH_WATERMARK * 100,
402           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
403   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
404       g_param_spec_double ("low-watermark", "Low watermark",
405           "Low threshold for buffering to start. Only used if use-buffering is True",
406           0.0, 1.0, DEFAULT_LOW_WATERMARK,
407           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
408   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
409       g_param_spec_double ("high-watermark", "High watermark",
410           "High threshold for buffering to finish. Only used if use-buffering is True",
411           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
412           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413
414   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
415       g_param_spec_string ("temp-template", "Temporary File Template",
416           "File template to store temporary files in, should contain directory "
417           "and XXXXXX. (NULL == disabled)",
418           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
419
420   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
421       g_param_spec_string ("temp-location", "Temporary File Location",
422           "Location to store temporary files in (Only read this property, "
423           "use temp-template to configure the name template)",
424           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
425   g_object_class_install_property (gobject_class, PROP_USE_BITRATE_QUERY,
426       g_param_spec_boolean ("use-bitrate-query",
427           "Use bitrate from downstream query",
428           "Use a bitrate from a downstream query to estimate buffer duration if not provided",
429           DEFAULT_USE_BITRATE_QUERY,
430           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
431           G_PARAM_STATIC_STRINGS));
432
433   /**
434    * GstQueue2:temp-remove
435    *
436    * When temp-template is set, remove the temporary file when going to READY.
437    */
438   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
439       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
440           "Remove the temp-location after use",
441           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442
443   /**
444    * GstQueue2:ring-buffer-max-size
445    *
446    * The maximum size of the ring buffer in bytes. If set to 0, the ring
447    * buffer is disabled. Default 0.
448    */
449   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
450       g_param_spec_uint64 ("ring-buffer-max-size",
451           "Max. ring buffer size (bytes)",
452           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
453           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
454           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
455
456   /**
457    * GstQueue2:avg-in-rate
458    *
459    * The average input data rate.
460    */
461   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
462       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
463           "Average input data rate (bytes/s)",
464           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
465
466   /**
467    * GstQueue2:bitrate
468    *
469    * The value used to convert between byte and time values for limiting
470    * the size of the queue.  Values are taken from either the upstream tags
471    * or from the downstream bitrate query.
472    */
473   g_object_class_install_property (gobject_class, PROP_BITRATE,
474       g_param_spec_uint64 ("bitrate", "Bitrate (bits/s)",
475           "Conversion value between data size and time",
476           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
477
478 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
479   /**
480    * GstQueue2:buffer-mode:
481    *
482    * Control the buffering mode used by the queue2.
483    */
484   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
485       g_param_spec_enum ("buffer-mode", "Buffer Mode",
486           "Control the buffering algorithm in use",
487           GST_TYPE_BUFFERING_MODE, GST_BUFFERING_STREAM,
488           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
489 #endif
490
491   /* set several parent class virtual functions */
492   gobject_class->finalize = gst_queue2_finalize;
493
494   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
495   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
496
497   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
498       "Generic",
499       "Simple data queue",
500       "Erik Walthinsen <omega@cse.ogi.edu>, "
501       "Wim Taymans <wim.taymans@gmail.com>");
502
503   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
504   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
505 }
506
507 static void
508 gst_queue2_init (GstQueue2 * queue)
509 {
510   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
511
512   gst_pad_set_chain_function (queue->sinkpad,
513       GST_DEBUG_FUNCPTR (gst_queue2_chain));
514   gst_pad_set_chain_list_function (queue->sinkpad,
515       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
516   gst_pad_set_activatemode_function (queue->sinkpad,
517       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
518   gst_pad_set_event_full_function (queue->sinkpad,
519       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
520   gst_pad_set_query_function (queue->sinkpad,
521       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
522   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
523   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
524
525   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
526
527   gst_pad_set_activatemode_function (queue->srcpad,
528       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
529   gst_pad_set_getrange_function (queue->srcpad,
530       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
531   gst_pad_set_event_function (queue->srcpad,
532       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
533   gst_pad_set_query_function (queue->srcpad,
534       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
535   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
536   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
537
538   /* levels */
539   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
540   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
541   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
542   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
543   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
544   queue->use_buffering = DEFAULT_USE_BUFFERING;
545   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
546   queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
547   queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
548
549   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
550   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
551
552   queue->sinktime = GST_CLOCK_TIME_NONE;
553   queue->srctime = GST_CLOCK_TIME_NONE;
554   queue->sink_tainted = TRUE;
555   queue->src_tainted = TRUE;
556
557   queue->srcresult = GST_FLOW_FLUSHING;
558   queue->sinkresult = GST_FLOW_FLUSHING;
559   queue->is_eos = FALSE;
560   queue->in_timer = g_timer_new ();
561   queue->out_timer = g_timer_new ();
562
563   g_mutex_init (&queue->qlock);
564   queue->waiting_add = FALSE;
565   g_cond_init (&queue->item_add);
566   queue->waiting_del = FALSE;
567   g_cond_init (&queue->item_del);
568   queue->queue = gst_queue_array_new_for_struct (sizeof (GstQueue2Item), 32);
569
570   g_cond_init (&queue->query_handled);
571   queue->last_query = FALSE;
572
573   g_mutex_init (&queue->buffering_post_lock);
574   queue->buffering_percent = 100;
575   queue->last_posted_buffering_percent = -1;
576
577   /* tempfile related */
578   queue->temp_template = NULL;
579   queue->temp_location = NULL;
580   queue->temp_remove = DEFAULT_TEMP_REMOVE;
581
582   queue->ring_buffer = NULL;
583   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
584
585   queue->use_bitrate_query = DEFAULT_USE_BITRATE_QUERY;
586
587 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
588   queue->mode = -1;
589 #endif
590   GST_DEBUG_OBJECT (queue,
591       "initialized queue's not_empty & not_full conditions");
592 }
593
594 /* called only once, as opposed to dispose */
595 static void
596 gst_queue2_finalize (GObject * object)
597 {
598   GstQueue2 *queue = GST_QUEUE2 (object);
599   GstQueue2Item *qitem;
600
601   GST_DEBUG_OBJECT (queue, "finalizing queue");
602
603   while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
604     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
605       gst_mini_object_unref (qitem->item);
606   }
607   gst_queue_array_free (queue->queue);
608
609   queue->last_query = FALSE;
610   g_mutex_clear (&queue->qlock);
611   g_mutex_clear (&queue->buffering_post_lock);
612   g_cond_clear (&queue->item_add);
613   g_cond_clear (&queue->item_del);
614   g_cond_clear (&queue->query_handled);
615   g_timer_destroy (queue->in_timer);
616   g_timer_destroy (queue->out_timer);
617
618   /* temp_file path cleanup  */
619   g_free (queue->temp_template);
620   g_free (queue->temp_location);
621
622   G_OBJECT_CLASS (parent_class)->finalize (object);
623 }
624
625 static void
626 debug_ranges (GstQueue2 * queue)
627 {
628   GstQueue2Range *walk;
629
630   for (walk = queue->ranges; walk; walk = walk->next) {
631     GST_DEBUG_OBJECT (queue,
632         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
633         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
634         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
635         walk->rb_writing_pos, walk->reading_pos,
636         walk == queue->current ? "**y**" : "  n  ");
637   }
638 }
639
640 /* clear all the downloaded ranges */
641 static void
642 clean_ranges (GstQueue2 * queue)
643 {
644   GST_DEBUG_OBJECT (queue, "clean queue ranges");
645
646   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
647   queue->ranges = NULL;
648   queue->current = NULL;
649 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
650   queue->read = NULL;
651 #endif
652 }
653
654 /* find a range that contains @offset or NULL when nothing does */
655 static GstQueue2Range *
656 find_range (GstQueue2 * queue, guint64 offset)
657 {
658   GstQueue2Range *range = NULL;
659   GstQueue2Range *walk;
660
661   /* first do a quick check for the current range */
662   for (walk = queue->ranges; walk; walk = walk->next) {
663     if (offset >= walk->offset && offset <= walk->writing_pos) {
664       /* we can reuse an existing range */
665       range = walk;
666       break;
667     }
668   }
669   if (range) {
670     GST_DEBUG_OBJECT (queue,
671         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
672         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
673   } else {
674     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
675   }
676   return range;
677 }
678
679 static void
680 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
681 {
682   guint64 max_reading_pos, writing_pos;
683
684   writing_pos = range->writing_pos;
685   max_reading_pos = range->max_reading_pos;
686
687   if (writing_pos > max_reading_pos)
688     queue->cur_level.bytes = writing_pos - max_reading_pos;
689   else
690     queue->cur_level.bytes = 0;
691 }
692
693 /* make a new range for @offset or reuse an existing range */
694 static GstQueue2Range *
695 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
696 {
697   GstQueue2Range *range, *prev, *next;
698
699   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
700
701   if ((range = find_range (queue, offset))) {
702     GST_DEBUG_OBJECT (queue,
703         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
704         range->writing_pos);
705     if (update_existing && range->writing_pos != offset) {
706       GST_DEBUG_OBJECT (queue, "updating range writing position to "
707           "%" G_GUINT64_FORMAT, offset);
708       range->writing_pos = offset;
709     }
710   } else {
711     GST_DEBUG_OBJECT (queue,
712         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
713
714     range = g_slice_new0 (GstQueue2Range);
715     range->offset = offset;
716     /* we want to write to the next location in the ring buffer */
717     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
718     range->writing_pos = offset;
719     range->rb_writing_pos = range->rb_offset;
720     range->reading_pos = offset;
721     range->max_reading_pos = offset;
722
723     /* insert sorted */
724     prev = NULL;
725     next = queue->ranges;
726     while (next) {
727       if (next->offset > offset) {
728         /* insert before next */
729         GST_DEBUG_OBJECT (queue,
730             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
731             next->offset);
732         break;
733       }
734       /* try next */
735       prev = next;
736       next = next->next;
737     }
738     range->next = next;
739     if (prev)
740       prev->next = range;
741     else
742       queue->ranges = range;
743   }
744   debug_ranges (queue);
745
746   /* update the stats for this range */
747   update_cur_level (queue, range);
748
749   return range;
750 }
751
752
753 /* clear and init the download ranges for offset 0 */
754 static void
755 init_ranges (GstQueue2 * queue)
756 {
757   GST_DEBUG_OBJECT (queue, "init queue ranges");
758
759   /* get rid of all the current ranges */
760   clean_ranges (queue);
761   /* make a range for offset 0 */
762 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
763   queue->current = queue->read = add_range (queue, 0, TRUE);
764 #else
765   queue->current = add_range (queue, 0, TRUE);
766 #endif
767 }
768
769 /* calculate the diff between running time on the sink and src of the queue.
770  * This is the total amount of time in the queue. */
771 static void
772 update_time_level (GstQueue2 * queue)
773 {
774   if (queue->sink_tainted) {
775     queue->sinktime =
776         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
777         queue->sink_segment.position);
778     queue->sink_tainted = FALSE;
779   }
780
781   if (queue->src_tainted) {
782     queue->srctime =
783         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
784         queue->src_segment.position);
785     queue->src_tainted = FALSE;
786   }
787
788   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
789       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
790
791   if (queue->sinktime != GST_CLOCK_TIME_NONE
792       && queue->srctime != GST_CLOCK_TIME_NONE
793       && queue->sinktime >= queue->srctime)
794     queue->cur_level.time = queue->sinktime - queue->srctime;
795   else
796     queue->cur_level.time = 0;
797 }
798
799 /* take a SEGMENT event and apply the values to segment, updating the time
800  * level of queue. */
801 static void
802 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
803     gboolean is_sink)
804 {
805   gst_event_copy_segment (event, segment);
806
807   if (segment->format == GST_FORMAT_BYTES) {
808     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
809       /* start is where we'll be getting from and as such writing next */
810 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
811       queue->current = queue->read = add_range (queue, segment->start, TRUE);
812 #else
813       queue->current = add_range (queue, segment->start, TRUE);
814 #endif
815     }
816   }
817
818   /* now configure the values, we use these to track timestamps on the
819    * sinkpad. */
820   if (segment->format != GST_FORMAT_TIME) {
821     /* non-time format, pretend the current time segment is closed with a
822      * 0 start and unknown stop time. */
823     segment->format = GST_FORMAT_TIME;
824     segment->start = 0;
825     segment->stop = -1;
826     segment->time = 0;
827   }
828
829   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
830
831   if (is_sink)
832     queue->sink_tainted = TRUE;
833   else
834     queue->src_tainted = TRUE;
835
836   /* segment can update the time level of the queue */
837   update_time_level (queue);
838 }
839
840 static void
841 apply_gap (GstQueue2 * queue, GstEvent * event,
842     GstSegment * segment, gboolean is_sink)
843 {
844   GstClockTime timestamp;
845   GstClockTime duration;
846
847   gst_event_parse_gap (event, &timestamp, &duration);
848
849   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
850
851     if (GST_CLOCK_TIME_IS_VALID (duration)) {
852       timestamp += duration;
853     }
854
855     segment->position = timestamp;
856
857     if (is_sink)
858       queue->sink_tainted = TRUE;
859     else
860       queue->src_tainted = TRUE;
861
862     /* calc diff with other end */
863     update_time_level (queue);
864   }
865 }
866
867 static void
868 query_downstream_bitrate (GstQueue2 * queue)
869 {
870   GstQuery *query = gst_query_new_bitrate ();
871   guint downstream_bitrate = 0;
872
873   if (gst_pad_peer_query (queue->srcpad, query)) {
874     gst_query_parse_bitrate (query, &downstream_bitrate);
875     GST_DEBUG_OBJECT (queue, "Got bitrate of %u from downstream",
876         downstream_bitrate);
877   } else {
878     GST_DEBUG_OBJECT (queue, "Failed to query bitrate from downstream");
879   }
880
881   gst_query_unref (query);
882
883   GST_QUEUE2_MUTEX_LOCK (queue);
884   queue->downstream_bitrate = downstream_bitrate;
885   GST_QUEUE2_MUTEX_UNLOCK (queue);
886
887   g_object_notify (G_OBJECT (queue), "bitrate");
888 }
889
890 /* take a buffer and update segment, updating the time level of the queue. */
891 static void
892 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
893     guint64 size, gboolean is_sink)
894 {
895   GstClockTime duration, timestamp;
896
897   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
898   duration = GST_BUFFER_DURATION (buffer);
899
900   /* If we have no duration, pick one from the bitrate if we can */
901   if (duration == GST_CLOCK_TIME_NONE) {
902     if (queue->use_tags_bitrate) {
903       guint bitrate =
904           is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
905       if (bitrate)
906         duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
907     }
908     if (duration == GST_CLOCK_TIME_NONE && !is_sink && queue->use_bitrate_query) {
909       if (queue->downstream_bitrate > 0) {
910         duration =
911             gst_util_uint64_scale (size, 8 * GST_SECOND,
912             queue->downstream_bitrate);
913
914         GST_LOG_OBJECT (queue, "got bitrate %u resulting in estimated "
915             "duration %" GST_TIME_FORMAT, queue->downstream_bitrate,
916             GST_TIME_ARGS (duration));
917       }
918     }
919   }
920
921   /* if no timestamp is set, assume it's continuous with the previous
922    * time */
923   if (timestamp == GST_CLOCK_TIME_NONE)
924     timestamp = segment->position;
925
926   /* add duration */
927   if (duration != GST_CLOCK_TIME_NONE)
928     timestamp += duration;
929
930   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
931       GST_TIME_ARGS (timestamp));
932
933   segment->position = timestamp;
934
935   if (is_sink)
936     queue->sink_tainted = TRUE;
937   else
938     queue->src_tainted = TRUE;
939
940   /* calc diff with other end */
941   update_time_level (queue);
942 }
943
944 struct BufListData
945 {
946   GstClockTime timestamp;
947   guint bitrate;
948 };
949
950 static gboolean
951 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
952 {
953   struct BufListData *bld = data;
954   GstClockTime *timestamp = &bld->timestamp;
955   GstClockTime btime;
956
957   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
958       " duration %" GST_TIME_FORMAT, idx,
959       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
960       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
961       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
962
963   btime = GST_BUFFER_DTS_OR_PTS (*buf);
964   if (GST_CLOCK_TIME_IS_VALID (btime))
965     *timestamp = btime;
966
967   if (GST_BUFFER_DURATION_IS_VALID (*buf))
968     *timestamp += GST_BUFFER_DURATION (*buf);
969   else if (bld->bitrate != 0) {
970     guint64 size = gst_buffer_get_size (*buf);
971
972     /* If we have no duration, pick one from the bitrate if we can */
973     *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
974   }
975
976
977   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
978   return TRUE;
979 }
980
981 /* take a buffer list and update segment, updating the time level of the queue */
982 static void
983 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
984     GstSegment * segment, gboolean is_sink)
985 {
986   struct BufListData bld;
987
988   /* if no timestamp is set, assume it's continuous with the previous time */
989   bld.timestamp = segment->position;
990
991   bld.bitrate = 0;
992   if (queue->use_tags_bitrate) {
993     if (is_sink)
994       bld.bitrate = queue->sink_tags_bitrate;
995     else
996       bld.bitrate = queue->src_tags_bitrate;
997   }
998   if (!is_sink && bld.bitrate == 0 && queue->use_bitrate_query) {
999     bld.bitrate = queue->downstream_bitrate;
1000   }
1001
1002   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
1003
1004   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
1005       GST_TIME_ARGS (bld.timestamp));
1006
1007   segment->position = bld.timestamp;
1008
1009   if (is_sink)
1010     queue->sink_tainted = TRUE;
1011   else
1012     queue->src_tainted = TRUE;
1013
1014   /* calc diff with other end */
1015   update_time_level (queue);
1016 }
1017
1018 static inline gint
1019 normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
1020     guint64 alt_max)
1021 {
1022   guint64 p;
1023
1024   if (max_level == 0)
1025     return 0;
1026
1027   if (alt_max > 0)
1028     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
1029         MIN (max_level, alt_max));
1030   else
1031     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
1032
1033   return MIN (p, MAX_BUFFERING_LEVEL);
1034 }
1035
1036 static gboolean
1037 get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
1038     gint * buffering_level)
1039 {
1040   gint buflevel, buflevel2;
1041
1042   if (queue->high_watermark <= 0) {
1043     if (buffering_level)
1044       *buffering_level = MAX_BUFFERING_LEVEL;
1045     if (is_buffering)
1046       *is_buffering = FALSE;
1047     return FALSE;
1048   }
1049 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
1050     normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
1051
1052   if (queue->is_eos || queue->srcresult == GST_FLOW_NOT_LINKED) {
1053     /* on EOS and NOT_LINKED we are always 100% full, we set the var
1054      * here so that we can reuse the logic below to stop buffering */
1055     buflevel = MAX_BUFFERING_LEVEL;
1056     GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
1057   } else {
1058     GST_LOG_OBJECT (queue,
1059         "Cur level bytes/time/rate-time/buffers %u/%" GST_TIME_FORMAT "/%"
1060         GST_TIME_FORMAT "/%u", queue->cur_level.bytes,
1061         GST_TIME_ARGS (queue->cur_level.time),
1062         GST_TIME_ARGS (queue->cur_level.rate_time), queue->cur_level.buffers);
1063
1064     /* figure out the buffering level we are filled, we take the max of all formats. */
1065     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1066       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
1067     } else {
1068       guint64 rb_size = queue->ring_buffer_max_size;
1069       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
1070     }
1071
1072     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
1073     buflevel = MAX (buflevel, buflevel2);
1074
1075     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
1076     buflevel = MAX (buflevel, buflevel2);
1077
1078     /* also apply the rate estimate when we need to */
1079     if (queue->use_rate_estimate) {
1080       buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
1081       buflevel = MAX (buflevel, buflevel2);
1082     }
1083
1084     /* Don't get to 0% unless we're really empty */
1085     if (queue->cur_level.bytes > 0)
1086       buflevel = MAX (1, buflevel);
1087   }
1088 #undef GET_BUFFER_LEVEL_FOR_QUANTITY
1089
1090   if (is_buffering)
1091     *is_buffering = queue->is_buffering;
1092
1093   if (buffering_level)
1094     *buffering_level = buflevel;
1095
1096   GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
1097       buflevel);
1098
1099   return TRUE;
1100 }
1101
1102 static gint
1103 convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
1104 {
1105   int percent;
1106
1107   /* scale so that if buffering_level equals the high watermark,
1108    * the percentage is 100% */
1109   percent = buffering_level * 100 / queue->high_watermark;
1110   /* clip */
1111   if (percent > 100)
1112     percent = 100;
1113
1114   return percent;
1115 }
1116
1117 static void
1118 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
1119     gint * avg_in, gint * avg_out, gint64 * buffering_left)
1120 {
1121 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
1122   if (queue->mode != -1) {
1123     *mode = queue->mode;
1124   } else {
1125     if (mode) {
1126       if (!QUEUE_IS_USING_QUEUE (queue)) {
1127         if (QUEUE_IS_USING_RING_BUFFER (queue))
1128           *mode = GST_BUFFERING_TIMESHIFT;
1129         else
1130           *mode = GST_BUFFERING_DOWNLOAD;
1131       } else {
1132         *mode = GST_BUFFERING_STREAM;
1133       }
1134     }
1135   }
1136 #else
1137   if (mode) {
1138     if (!QUEUE_IS_USING_QUEUE (queue)) {
1139       if (QUEUE_IS_USING_RING_BUFFER (queue))
1140         *mode = GST_BUFFERING_TIMESHIFT;
1141       else
1142         *mode = GST_BUFFERING_DOWNLOAD;
1143     } else {
1144       *mode = GST_BUFFERING_STREAM;
1145     }
1146   }
1147 #endif
1148
1149   if (avg_in)
1150     *avg_in = queue->byte_in_rate;
1151   if (avg_out)
1152     *avg_out = queue->byte_out_rate;
1153
1154   if (buffering_left) {
1155     *buffering_left = (percent == 100 ? 0 : -1);
1156
1157     if (queue->use_rate_estimate) {
1158       guint64 max, cur;
1159
1160       max = queue->max_level.rate_time;
1161       cur = queue->cur_level.rate_time;
1162
1163       if (percent != 100 && max > cur)
1164         *buffering_left = (max - cur) / 1000000;
1165     }
1166   }
1167 }
1168
1169 /* Called with the lock taken */
1170 static GstMessage *
1171 gst_queue2_get_buffering_message (GstQueue2 * queue)
1172 {
1173   GstMessage *msg = NULL;
1174   if (queue->percent_changed) {
1175     /* Don't change the buffering level if the sinkpad is waiting for
1176      * space to become available.  This prevents the situation where,
1177      * upstream is pushing buffers larger than our limits so only 1 buffer
1178      * is ever in the queue at a time.
1179      * Changing the level causes a buffering message to be posted saying that
1180      * we are buffering which the application may pause to wait for another
1181      * 100% buffering message which would be posted very soon after the
1182      * waiting sink thread adds it's buffer to the queue */
1183     /* FIXME: This situation above can still occur later if
1184      * the sink pad is waiting to push a serialized event into the queue and
1185      * the queue becomes empty for a short period of time. */
1186     if (!queue->waiting_del
1187         && queue->last_posted_buffering_percent != queue->buffering_percent) {
1188       gint percent = queue->buffering_percent;
1189
1190       GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1191       msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1192
1193       gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1194           queue->avg_out, queue->buffering_left);
1195
1196       queue->last_posted_buffering_percent = percent;
1197     }
1198     queue->percent_changed = FALSE;
1199   }
1200
1201   return msg;
1202 }
1203
1204 static void
1205 gst_queue2_post_buffering (GstQueue2 * queue)
1206 {
1207   GstMessage *msg = NULL;
1208
1209   g_mutex_lock (&queue->buffering_post_lock);
1210   GST_QUEUE2_MUTEX_LOCK (queue);
1211   msg = gst_queue2_get_buffering_message (queue);
1212   GST_QUEUE2_MUTEX_UNLOCK (queue);
1213
1214   if (msg != NULL)
1215     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1216
1217   g_mutex_unlock (&queue->buffering_post_lock);
1218 }
1219
1220 static void
1221 update_buffering (GstQueue2 * queue)
1222 {
1223   gint buffering_level, percent;
1224 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1225   GstQueue2Range *range;
1226
1227   if (queue->read)
1228     range = queue->read;
1229   else
1230     range = queue->current;
1231 #endif
1232
1233   /* Ensure the variables used to calculate buffering state are up-to-date. */
1234 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1235   if (range)
1236     update_cur_level (queue, range);
1237 #else
1238   if (queue->current)
1239     update_cur_level (queue, queue->current);
1240 #endif
1241   update_in_rates (queue, FALSE);
1242
1243   if (!get_buffering_level (queue, NULL, &buffering_level))
1244     return;
1245
1246   percent = convert_to_buffering_percent (queue, buffering_level);
1247
1248   if (queue->is_buffering) {
1249     /* if we were buffering see if we reached the high watermark */
1250     if (percent >= 100)
1251       queue->is_buffering = FALSE;
1252
1253     SET_PERCENT (queue, percent);
1254   } else {
1255     /* we were not buffering, check if we need to start buffering if we drop
1256      * below the low threshold */
1257     if (buffering_level < queue->low_watermark) {
1258       queue->is_buffering = TRUE;
1259       SET_PERCENT (queue, percent);
1260     }
1261   }
1262 }
1263
1264 static void
1265 reset_rate_timer (GstQueue2 * queue)
1266 {
1267   queue->bytes_in = 0;
1268   queue->bytes_out = 0;
1269   queue->byte_in_rate = 0.0;
1270   queue->byte_in_period = 0;
1271   queue->byte_out_rate = 0.0;
1272   queue->last_update_in_rates_elapsed = 0.0;
1273   queue->last_in_elapsed = 0.0;
1274   queue->last_out_elapsed = 0.0;
1275   queue->in_timer_started = FALSE;
1276   queue->out_timer_started = FALSE;
1277 }
1278
1279 /* the interval in seconds to recalculate the rate */
1280 #define RATE_INTERVAL    0.2
1281 /* Tuning for rate estimation. We use a large window for the input rate because
1282  * it should be stable when connected to a network. The output rate is less
1283  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1284  * therefore adapt more quickly.
1285  * However, initial input rate may be subject to a burst, and should therefore
1286  * initially also adapt more quickly to changes, and only later on give higher
1287  * weight to previous values. */
1288 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1289 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1290
1291 static void
1292 update_in_rates (GstQueue2 * queue, gboolean force)
1293 {
1294   gdouble elapsed, period;
1295   gdouble byte_in_rate;
1296
1297   if (!queue->in_timer_started) {
1298     queue->in_timer_started = TRUE;
1299     g_timer_start (queue->in_timer);
1300     return;
1301   }
1302
1303   queue->last_update_in_rates_elapsed = elapsed =
1304       g_timer_elapsed (queue->in_timer, NULL);
1305
1306   /* recalc after each interval. */
1307   if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1308     period = elapsed - queue->last_in_elapsed;
1309
1310     GST_DEBUG_OBJECT (queue,
1311         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1312         period, queue->bytes_in, queue->byte_in_period);
1313
1314     byte_in_rate = queue->bytes_in / period;
1315
1316     if (queue->byte_in_rate == 0.0)
1317       queue->byte_in_rate = byte_in_rate;
1318     else
1319       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1320           (double) queue->byte_in_period, period);
1321
1322     /* another data point, cap at 16 for long time running average */
1323     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1324       queue->byte_in_period += period;
1325
1326     /* reset the values to calculate rate over the next interval */
1327     queue->last_in_elapsed = elapsed;
1328     queue->bytes_in = 0;
1329   }
1330
1331   if (queue->use_bitrate_query && queue->downstream_bitrate > 0) {
1332     queue->cur_level.rate_time =
1333         gst_util_uint64_scale (8 * queue->cur_level.bytes, GST_SECOND,
1334         queue->downstream_bitrate);
1335     GST_LOG_OBJECT (queue,
1336         "got bitrate %u with byte level %u resulting in time %"
1337         GST_TIME_FORMAT, queue->downstream_bitrate, queue->cur_level.bytes,
1338         GST_TIME_ARGS (queue->cur_level.rate_time));
1339   } else if (queue->byte_in_rate > 0.0) {
1340     queue->cur_level.rate_time =
1341         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1342   }
1343   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1344       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1345 }
1346
1347 static void
1348 update_out_rates (GstQueue2 * queue)
1349 {
1350   gdouble elapsed, period;
1351   gdouble byte_out_rate;
1352
1353   if (!queue->out_timer_started) {
1354     queue->out_timer_started = TRUE;
1355     g_timer_start (queue->out_timer);
1356     return;
1357   }
1358
1359   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1360
1361   /* recalc after each interval. */
1362   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1363     period = elapsed - queue->last_out_elapsed;
1364
1365     GST_DEBUG_OBJECT (queue,
1366         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1367
1368     byte_out_rate = queue->bytes_out / period;
1369
1370     if (queue->byte_out_rate == 0.0)
1371       queue->byte_out_rate = byte_out_rate;
1372     else
1373       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1374
1375     /* reset the values to calculate rate over the next interval */
1376     queue->last_out_elapsed = elapsed;
1377     queue->bytes_out = 0;
1378   }
1379   if (queue->byte_in_rate > 0.0) {
1380     queue->cur_level.rate_time =
1381         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1382   }
1383   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1384       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1385 }
1386
1387 static void
1388 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1389 {
1390   guint64 reading_pos, max_reading_pos;
1391
1392   reading_pos = pos;
1393   max_reading_pos = range->max_reading_pos;
1394
1395   max_reading_pos = MAX (max_reading_pos, reading_pos);
1396
1397   GST_DEBUG_OBJECT (queue,
1398       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1399       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1400   range->max_reading_pos = max_reading_pos;
1401
1402   update_cur_level (queue, range);
1403 }
1404
1405 static gboolean
1406 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1407 {
1408   GstEvent *event;
1409   gboolean res;
1410
1411   /* until we receive the FLUSH_STOP from this seek, we skip data */
1412   queue->seeking = TRUE;
1413   GST_QUEUE2_MUTEX_UNLOCK (queue);
1414
1415   debug_ranges (queue);
1416
1417   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1418
1419   event =
1420       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1421       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1422       GST_SEEK_TYPE_NONE, -1);
1423
1424   res = gst_pad_push_event (queue->sinkpad, event);
1425   GST_QUEUE2_MUTEX_LOCK (queue);
1426
1427   if (res) {
1428     /* Between us sending the seek event and re-acquiring the lock, the source
1429      * thread might already have pushed data and moved along the range's
1430      * writing_pos beyond the seek offset. In that case we don't want to set
1431      * the writing position back to the requested seek position, as it would
1432      * cause data to be written to the wrong offset in the file or ring buffer.
1433      * We still do the add_range call to switch the current range to the
1434      * requested range, or create one if one doesn't exist yet. */
1435 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1436     queue->current = queue->read = add_range (queue, offset, FALSE);
1437 #else
1438     queue->current = add_range (queue, offset, FALSE);
1439 #endif
1440   }
1441
1442   return res;
1443 }
1444
1445 /* get the threshold for when we decide to seek rather than wait */
1446 static guint64
1447 get_seek_threshold (GstQueue2 * queue)
1448 {
1449   guint64 threshold;
1450
1451   /* FIXME, find a good threshold based on the incoming rate. */
1452   threshold = 1024 * 512;
1453
1454   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1455     threshold = MIN (threshold,
1456         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1457   }
1458   return threshold;
1459 }
1460
1461 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1462 /* check the buffered data size, not to change range repeatedly.
1463  * changing range cause the new http connection and it makes buffering state frequently. */
1464 static gboolean
1465 change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length)
1466 {
1467   guint64 curr_level = 0;
1468   guint64 curr_min_level = 0;
1469
1470   if (queue->current->writing_pos > queue->current->reading_pos)
1471     curr_level = queue->current->writing_pos - queue->current->reading_pos;
1472
1473   /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
1474   curr_min_level = MIN((queue->max_level.bytes*0.05), get_seek_threshold(queue));
1475
1476   GST_DEBUG_OBJECT(queue, " curr[w%"G_GUINT64_FORMAT", r%"G_GUINT64_FORMAT
1477         ", d%"G_GUINT64_FORMAT", m%"G_GUINT64_FORMAT
1478         "] u[%"G_GUINT64_FORMAT"][EOS:%d]",
1479         queue->current->writing_pos, queue->current->reading_pos,
1480         curr_level, curr_min_level,
1481         queue->upstream_size, queue->is_eos);
1482
1483   /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
1484   if ((queue->is_eos) ||
1485       (queue->upstream_size > 0 && queue->current->writing_pos >= queue->upstream_size) ||
1486       (curr_level >= curr_min_level)) {
1487     GST_DEBUG_OBJECT (queue, "Range Switching");
1488     return TRUE;
1489   } else {
1490     GST_DEBUG_OBJECT (queue, "keep receiving data without range switching.");
1491     return FALSE;
1492   }
1493 }
1494 #endif
1495
1496 /* see if there is enough data in the file to read a full buffer */
1497 static gboolean
1498 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1499 {
1500   GstQueue2Range *range;
1501 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1502   gboolean need_to_seek = TRUE;
1503 #endif
1504   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1505       offset, length);
1506
1507   if ((range = find_range (queue, offset))) {
1508 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1509     queue->read = range;
1510 #endif
1511     if (queue->current != range) {
1512       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1513 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1514       if (QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER(queue))
1515         need_to_seek = change_current_range(queue, range, offset, length);
1516
1517       if (need_to_seek == TRUE)
1518         perform_seek_to_offset (queue, range->writing_pos);
1519 #else
1520       perform_seek_to_offset (queue, range->writing_pos);
1521 #endif
1522     }
1523
1524     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1525         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1526
1527     /* we have a range for offset */
1528     GST_DEBUG_OBJECT (queue,
1529         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1530         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1531
1532     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1533       return TRUE;
1534
1535     if (offset + length <= range->writing_pos)
1536       return TRUE;
1537     else
1538       GST_DEBUG_OBJECT (queue,
1539           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1540           (offset + length) - range->writing_pos);
1541
1542   } else {
1543     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1544         " len %u", offset, length);
1545
1546 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1547     queue->read = queue->current;
1548 #endif
1549
1550     /* we don't have the range, see how far away we are */
1551     if (!queue->is_eos && queue->current) {
1552       guint64 threshold = get_seek_threshold (queue);
1553
1554       if (offset >= queue->current->offset && offset <=
1555           queue->current->writing_pos + threshold) {
1556         GST_INFO_OBJECT (queue,
1557             "requested data is within range, wait for data");
1558         return FALSE;
1559       }
1560     }
1561
1562     /* too far away, do a seek */
1563     perform_seek_to_offset (queue, offset);
1564   }
1565
1566   return FALSE;
1567 }
1568
1569 #ifdef HAVE_FSEEKO
1570 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1571 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1572 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1573 #else
1574 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1575 #endif
1576
1577 static GstFlowReturn
1578 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1579     guint8 * dst, gint64 * read_return)
1580 {
1581   guint8 *ring_buffer;
1582   size_t res;
1583
1584   ring_buffer = queue->ring_buffer;
1585
1586   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1587     goto seek_failed;
1588
1589   /* this should not block */
1590   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1591       length, offset);
1592   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1593     res = fread (dst, 1, length, queue->temp_file);
1594   } else {
1595     memcpy (dst, ring_buffer + offset, length);
1596     res = length;
1597   }
1598
1599   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1600
1601   if (G_UNLIKELY (res < length)) {
1602     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1603       goto could_not_read;
1604     /* check for errors or EOF */
1605     if (ferror (queue->temp_file))
1606       goto could_not_read;
1607     if (feof (queue->temp_file) && length > 0)
1608       goto eos;
1609   }
1610
1611   *read_return = res;
1612
1613   return GST_FLOW_OK;
1614
1615 seek_failed:
1616   {
1617     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1618     return GST_FLOW_ERROR;
1619   }
1620 could_not_read:
1621   {
1622     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1623     return GST_FLOW_ERROR;
1624   }
1625 eos:
1626   {
1627     GST_DEBUG ("non-regular file hits EOS");
1628     return GST_FLOW_EOS;
1629   }
1630 }
1631
1632 static GstFlowReturn
1633 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1634     GstBuffer ** buffer)
1635 {
1636   GstBuffer *buf;
1637   GstMapInfo info;
1638   guint8 *data;
1639   guint64 file_offset;
1640   guint block_length, remaining, read_length;
1641   guint64 rb_size;
1642   guint64 max_size;
1643   guint64 rpos;
1644   GstFlowReturn ret = GST_FLOW_OK;
1645
1646   /* allocate the output buffer of the requested size */
1647   if (*buffer == NULL)
1648     buf = gst_buffer_new_allocate (NULL, length, NULL);
1649   else
1650     buf = *buffer;
1651
1652   if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1653     goto buffer_write_fail;
1654   data = info.data;
1655
1656   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1657       offset);
1658
1659   rpos = offset;
1660   rb_size = queue->ring_buffer_max_size;
1661   max_size = QUEUE_MAX_BYTES (queue);
1662
1663   remaining = length;
1664   while (remaining > 0) {
1665     /* configure how much/whether to read */
1666     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1667       read_length = 0;
1668
1669       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1670         guint64 level;
1671
1672         /* calculate how far away the offset is */
1673 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1674         if (queue->read->writing_pos > rpos)
1675           level = queue->read->writing_pos - rpos;
1676 #else
1677         if (queue->current->writing_pos > rpos)
1678           level = queue->current->writing_pos - rpos;
1679 #endif
1680         else
1681           level = 0;
1682
1683         GST_DEBUG_OBJECT (queue,
1684             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1685             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1686 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1687             rpos, queue->read->writing_pos, level, max_size);
1688 #else
1689             rpos, queue->current->writing_pos, level, max_size);
1690 #endif
1691
1692         if (level >= max_size) {
1693           /* we don't have the data but if we have a ring buffer that is full, we
1694            * need to read */
1695           GST_DEBUG_OBJECT (queue,
1696               "ring buffer full, reading QUEUE_MAX_BYTES %"
1697               G_GUINT64_FORMAT " bytes", max_size);
1698           read_length = max_size;
1699         } else if (queue->is_eos) {
1700           /* won't get any more data so read any data we have */
1701           if (level) {
1702             GST_DEBUG_OBJECT (queue,
1703                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1704                 level);
1705             read_length = level;
1706             remaining = level;
1707             length = level;
1708           } else
1709             goto hit_eos;
1710         }
1711       }
1712
1713       if (read_length == 0) {
1714         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1715 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1716           GST_DEBUG_OBJECT (queue,
1717               "update current position [%" G_GUINT64_FORMAT "-%"
1718               G_GUINT64_FORMAT "]", rpos, queue->read->max_reading_pos);
1719           update_cur_pos (queue, queue->read, rpos);
1720 #else
1721           GST_DEBUG_OBJECT (queue,
1722               "update current position [%" G_GUINT64_FORMAT "-%"
1723               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1724           update_cur_pos (queue, queue->current, rpos);
1725 #endif
1726           GST_QUEUE2_SIGNAL_DEL (queue);
1727         }
1728
1729         if (queue->use_buffering)
1730           update_buffering (queue);
1731
1732         GST_DEBUG_OBJECT (queue, "waiting for add");
1733         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1734         continue;
1735       }
1736     } else {
1737       /* we have the requested data so read it */
1738       read_length = remaining;
1739     }
1740
1741     /* set range reading_pos to actual reading position for this read */
1742 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1743     queue->read->reading_pos = rpos;
1744 #else
1745     queue->current->reading_pos = rpos;
1746 #endif
1747
1748     /* configure how much and from where to read */
1749     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1750       file_offset =
1751 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1752           (queue->read->rb_offset + (rpos -
1753               queue->read->offset)) % rb_size;
1754 #else
1755           (queue->current->rb_offset + (rpos -
1756               queue->current->offset)) % rb_size;
1757 #endif
1758       if (file_offset + read_length > rb_size) {
1759         block_length = rb_size - file_offset;
1760       } else {
1761         block_length = read_length;
1762       }
1763     } else {
1764       file_offset = rpos;
1765       block_length = read_length;
1766     }
1767
1768     /* while we still have data to read, we loop */
1769     while (read_length > 0) {
1770       gint64 read_return;
1771
1772       ret =
1773           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1774           data, &read_return);
1775       if (ret != GST_FLOW_OK)
1776         goto read_error;
1777
1778       file_offset += read_return;
1779       if (QUEUE_IS_USING_RING_BUFFER (queue))
1780         file_offset %= rb_size;
1781
1782       data += read_return;
1783       read_length -= read_return;
1784       block_length = read_length;
1785       remaining -= read_return;
1786
1787 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1788       rpos = (queue->read->reading_pos += read_return);
1789       update_cur_pos (queue, queue->read, queue->read->reading_pos);
1790 #else
1791       rpos = (queue->current->reading_pos += read_return);
1792       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1793 #endif
1794     }
1795     GST_QUEUE2_SIGNAL_DEL (queue);
1796     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1797   }
1798
1799   gst_buffer_unmap (buf, &info);
1800   gst_buffer_resize (buf, 0, length);
1801
1802   GST_BUFFER_OFFSET (buf) = offset;
1803   GST_BUFFER_OFFSET_END (buf) = offset + length;
1804
1805   *buffer = buf;
1806
1807   return ret;
1808
1809   /* ERRORS */
1810 hit_eos:
1811   {
1812     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1813     gst_buffer_unmap (buf, &info);
1814     if (*buffer == NULL)
1815       gst_buffer_unref (buf);
1816     return GST_FLOW_EOS;
1817   }
1818 out_flushing:
1819   {
1820     GST_DEBUG_OBJECT (queue, "we are flushing");
1821     gst_buffer_unmap (buf, &info);
1822     if (*buffer == NULL)
1823       gst_buffer_unref (buf);
1824     return GST_FLOW_FLUSHING;
1825   }
1826 read_error:
1827   {
1828     GST_DEBUG_OBJECT (queue, "we have a read error");
1829     gst_buffer_unmap (buf, &info);
1830     if (*buffer == NULL)
1831       gst_buffer_unref (buf);
1832     return ret;
1833   }
1834 buffer_write_fail:
1835   {
1836     GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1837         ("Can't write to buffer"));
1838     if (*buffer == NULL)
1839       gst_buffer_unref (buf);
1840     return GST_FLOW_ERROR;
1841   }
1842 }
1843
1844 /* should be called with QUEUE_LOCK */
1845 static GstMiniObject *
1846 gst_queue2_read_item_from_file (GstQueue2 * queue)
1847 {
1848   GstMiniObject *item;
1849
1850   if (queue->stream_start_event != NULL) {
1851     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1852     queue->stream_start_event = NULL;
1853   } else if (queue->starting_segment != NULL) {
1854     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1855     queue->starting_segment = NULL;
1856   } else {
1857     GstFlowReturn ret;
1858     GstBuffer *buffer = NULL;
1859     guint64 reading_pos;
1860
1861 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1862     reading_pos = queue->read->reading_pos;
1863 #else
1864     reading_pos = queue->current->reading_pos;
1865 #endif
1866
1867     ret =
1868         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1869         &buffer);
1870
1871     switch (ret) {
1872       case GST_FLOW_OK:
1873         item = GST_MINI_OBJECT_CAST (buffer);
1874         break;
1875       case GST_FLOW_EOS:
1876         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1877         break;
1878       default:
1879         item = NULL;
1880         break;
1881     }
1882   }
1883   return item;
1884 }
1885
1886 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1887  * the temp filename. */
1888 static gboolean
1889 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1890 {
1891   gint fd = -1;
1892   gchar *name = NULL;
1893
1894   if (queue->temp_file)
1895     goto already_opened;
1896
1897   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1898
1899   /* If temp_template was set, allocate a filename and open that file */
1900
1901   /* nothing to do */
1902   if (queue->temp_template == NULL)
1903     goto no_directory;
1904
1905   /* make copy of the template, we don't want to change this */
1906   name = g_strdup (queue->temp_template);
1907
1908 #ifdef __BIONIC__
1909   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1910 #else
1911   fd = g_mkstemp (name);
1912 #endif
1913
1914   if (fd == -1)
1915     goto mkstemp_failed;
1916
1917   /* open the file for update/writing */
1918   queue->temp_file = fdopen (fd, "wb+");
1919   /* error creating file */
1920   if (queue->temp_file == NULL)
1921     goto open_failed;
1922
1923   g_free (queue->temp_location);
1924   queue->temp_location = name;
1925
1926   GST_QUEUE2_MUTEX_UNLOCK (queue);
1927
1928   /* we can't emit the notify with the lock */
1929   g_object_notify (G_OBJECT (queue), "temp-location");
1930
1931   GST_QUEUE2_MUTEX_LOCK (queue);
1932
1933   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1934
1935   return TRUE;
1936
1937   /* ERRORS */
1938 already_opened:
1939   {
1940     GST_DEBUG_OBJECT (queue, "temp file was already open");
1941     return TRUE;
1942   }
1943 no_directory:
1944   {
1945     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1946         (_("No Temp directory specified.")), (NULL));
1947     return FALSE;
1948   }
1949 mkstemp_failed:
1950   {
1951     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1952         (_("Could not create temp file \"%s\"."), queue->temp_template),
1953         GST_ERROR_SYSTEM);
1954     g_free (name);
1955     return FALSE;
1956   }
1957 open_failed:
1958   {
1959     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1960         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1961     g_free (name);
1962     if (fd != -1)
1963       close (fd);
1964     return FALSE;
1965   }
1966 }
1967
1968 static void
1969 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1970 {
1971   /* nothing to do */
1972   if (queue->temp_file == NULL)
1973     return;
1974
1975   GST_DEBUG_OBJECT (queue, "closing temp file");
1976
1977   fflush (queue->temp_file);
1978   fclose (queue->temp_file);
1979
1980   if (queue->temp_remove) {
1981     if (remove (queue->temp_location) < 0) {
1982       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1983           queue->temp_location, g_strerror (errno));
1984     }
1985   }
1986
1987   queue->temp_file = NULL;
1988   clean_ranges (queue);
1989 }
1990
1991 static void
1992 gst_queue2_flush_temp_file (GstQueue2 * queue)
1993 {
1994   if (queue->temp_file == NULL)
1995     return;
1996
1997   GST_DEBUG_OBJECT (queue, "flushing temp file");
1998
1999   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
2000 }
2001
2002 static void
2003 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
2004 {
2005   if (!QUEUE_IS_USING_QUEUE (queue)) {
2006     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
2007       gst_queue2_flush_temp_file (queue);
2008     init_ranges (queue);
2009   } else {
2010     GstQueue2Item *qitem;
2011
2012     while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
2013       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
2014           && GST_EVENT_IS_STICKY (qitem->item)
2015           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
2016           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
2017         gst_pad_store_sticky_event (queue->srcpad,
2018             GST_EVENT_CAST (qitem->item));
2019       }
2020
2021       /* Then lose another reference because we are supposed to destroy that
2022          data when flushing */
2023       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
2024         gst_mini_object_unref (qitem->item);
2025     }
2026   }
2027   queue->last_query = FALSE;
2028   g_cond_signal (&queue->query_handled);
2029   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2030   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
2031   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
2032   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
2033   queue->sink_tainted = queue->src_tainted = TRUE;
2034   if (queue->starting_segment != NULL)
2035     gst_event_unref (queue->starting_segment);
2036   queue->starting_segment = NULL;
2037   queue->segment_event_received = FALSE;
2038   gst_event_replace (&queue->stream_start_event, NULL);
2039
2040   /* we deleted a lot of something */
2041   GST_QUEUE2_SIGNAL_DEL (queue);
2042 }
2043
2044 static gboolean
2045 gst_queue2_wait_free_space (GstQueue2 * queue)
2046 {
2047   /* We make space available if we're "full" according to whatever
2048    * the user defined as "full". */
2049   if (gst_queue2_is_filled (queue)) {
2050     gboolean started;
2051
2052     /* pause the timer while we wait. The fact that we are waiting does not mean
2053      * the byterate on the input pad is lower */
2054     if ((started = queue->in_timer_started))
2055       g_timer_stop (queue->in_timer);
2056
2057     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2058         "queue is full, waiting for free space");
2059     do {
2060       /* Wait for space to be available, we could be unlocked because of a flush. */
2061       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
2062     }
2063     while (gst_queue2_is_filled (queue));
2064
2065     /* and continue if we were running before */
2066     if (started)
2067       g_timer_continue (queue->in_timer);
2068   }
2069   return TRUE;
2070
2071   /* ERRORS */
2072 out_flushing:
2073   {
2074     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
2075     return FALSE;
2076   }
2077 }
2078
2079 static gboolean
2080 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
2081 {
2082   GstMapInfo info;
2083   guint8 *data, *ring_buffer;
2084   guint size, rb_size;
2085   guint64 writing_pos, new_writing_pos;
2086   GstQueue2Range *range, *prev, *next;
2087   gboolean do_seek = FALSE;
2088
2089   if (QUEUE_IS_USING_RING_BUFFER (queue))
2090     writing_pos = queue->current->rb_writing_pos;
2091   else
2092     writing_pos = queue->current->writing_pos;
2093   ring_buffer = queue->ring_buffer;
2094   rb_size = queue->ring_buffer_max_size;
2095
2096   if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
2097     goto buffer_read_error;
2098
2099   size = info.size;
2100   data = info.data;
2101
2102   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
2103       writing_pos);
2104
2105   /* sanity check */
2106   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
2107       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
2108     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
2109         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
2110         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
2111   }
2112
2113   while (size > 0) {
2114     guint to_write;
2115
2116     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2117       gint64 space;
2118
2119       /* calculate the space in the ring buffer not used by data from
2120        * the current range */
2121       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
2122         /* wait until there is some free space */
2123         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
2124       }
2125       /* get the amount of space we have */
2126       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
2127
2128       /* calculate if we need to split or if we can write the entire
2129        * buffer now */
2130       to_write = MIN (size, space);
2131
2132       /* the writing position in the ring buffer after writing (part
2133        * or all of) the buffer */
2134       new_writing_pos = (writing_pos + to_write) % rb_size;
2135
2136       prev = NULL;
2137       range = queue->ranges;
2138
2139       /* if we need to overwrite data in the ring buffer, we need to
2140        * update the ranges
2141        *
2142        * warning: this code is complicated and includes some
2143        * simplifications - pen, paper and diagrams for the cases
2144        * recommended! */
2145       while (range) {
2146         guint64 range_data_start, range_data_end;
2147         GstQueue2Range *range_to_destroy = NULL;
2148
2149 #ifndef TIZEN_FEATURE_QUEUE2_MODIFICATION
2150         if (range == queue->current)
2151           goto next_range;
2152 #endif
2153
2154         range_data_start = range->rb_offset;
2155         range_data_end = range->rb_writing_pos;
2156
2157         /* handle the special case where the range has no data in it */
2158         if (range->writing_pos == range->offset) {
2159           if (range != queue->current) {
2160             GST_DEBUG_OBJECT (queue,
2161                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2162                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
2163             /* remove range */
2164             range_to_destroy = range;
2165             if (prev)
2166               prev->next = range->next;
2167           }
2168           goto next_range;
2169         }
2170
2171         if (range_data_end > range_data_start) {
2172           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
2173             goto next_range;
2174
2175           if (new_writing_pos > range_data_start) {
2176 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
2177             if (new_writing_pos >= range_data_end && range != queue->current) {
2178 #else
2179             if (new_writing_pos >= range_data_end) {
2180 #endif
2181               GST_DEBUG_OBJECT (queue,
2182                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2183                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
2184               /* remove range */
2185               range_to_destroy = range;
2186               if (prev)
2187                 prev->next = range->next;
2188             } else {
2189               GST_DEBUG_OBJECT (queue,
2190                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
2191                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
2192                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
2193                   range->offset + new_writing_pos - range_data_start,
2194                   new_writing_pos);
2195               range->offset += (new_writing_pos - range_data_start);
2196               range->rb_offset = new_writing_pos;
2197             }
2198           }
2199         } else {
2200           guint64 new_wpos_virt = writing_pos + to_write;
2201
2202           if (new_wpos_virt <= range_data_start)
2203             goto next_range;
2204
2205 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
2206           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end
2207             && range != queue->current) {
2208 #else
2209           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
2210 #endif
2211             GST_DEBUG_OBJECT (queue,
2212                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2213                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
2214             /* remove range */
2215             range_to_destroy = range;
2216             if (prev)
2217               prev->next = range->next;
2218           } else {
2219             GST_DEBUG_OBJECT (queue,
2220                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
2221                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
2222                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
2223                 range->offset + new_writing_pos - range_data_start,
2224                 new_writing_pos);
2225             range->offset += (new_wpos_virt - range_data_start);
2226             range->rb_offset = new_writing_pos;
2227           }
2228         }
2229
2230       next_range:
2231         if (!range_to_destroy)
2232           prev = range;
2233
2234         range = range->next;
2235         if (range_to_destroy) {
2236           if (range_to_destroy == queue->ranges)
2237             queue->ranges = range;
2238           g_slice_free (GstQueue2Range, range_to_destroy);
2239           range_to_destroy = NULL;
2240         }
2241       }
2242     } else {
2243       to_write = size;
2244       new_writing_pos = writing_pos + to_write;
2245     }
2246
2247     if (QUEUE_IS_USING_TEMP_FILE (queue)
2248         && FSEEK_FILE (queue->temp_file, writing_pos))
2249       goto seek_failed;
2250
2251     if (new_writing_pos > writing_pos) {
2252       GST_INFO_OBJECT (queue,
2253           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
2254           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
2255           queue->current->writing_pos, queue->current->rb_writing_pos);
2256       /* either not using ring buffer or no wrapping, just write */
2257       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2258         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
2259           goto handle_error;
2260       } else {
2261         memcpy (ring_buffer + writing_pos, data, to_write);
2262       }
2263
2264       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
2265         /* try to merge with next range */
2266         while ((next = queue->current->next)) {
2267           GST_INFO_OBJECT (queue,
2268               "checking merge with next range %" G_GUINT64_FORMAT " < %"
2269               G_GUINT64_FORMAT, new_writing_pos, next->offset);
2270           if (new_writing_pos < next->offset)
2271             break;
2272
2273           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
2274               next->writing_pos);
2275
2276           /* remove the group */
2277           queue->current->next = next->next;
2278
2279           /* We use the threshold to decide if we want to do a seek or simply
2280            * read the data again. If there is not so much data in the range we
2281            * prefer to avoid to seek and read it again. */
2282           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
2283             /* the new range had more data than the threshold, it's worth keeping
2284              * it and doing a seek. */
2285             new_writing_pos = next->writing_pos;
2286             do_seek = TRUE;
2287           }
2288           g_slice_free (GstQueue2Range, next);
2289         }
2290         goto update_and_signal;
2291       }
2292     } else {
2293       /* wrapping */
2294       guint block_one, block_two;
2295
2296       block_one = rb_size - writing_pos;
2297       block_two = to_write - block_one;
2298
2299       if (block_one > 0) {
2300         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2301         /* write data to end of ring buffer */
2302         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2303           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2304             goto handle_error;
2305         } else {
2306           memcpy (ring_buffer + writing_pos, data, block_one);
2307         }
2308       }
2309
2310       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2311         goto seek_failed;
2312
2313       if (block_two > 0) {
2314         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2315         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2316           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2317             goto handle_error;
2318         } else {
2319           memcpy (ring_buffer, data + block_one, block_two);
2320         }
2321       }
2322     }
2323
2324   update_and_signal:
2325     /* update the writing positions */
2326     size -= to_write;
2327     GST_INFO_OBJECT (queue,
2328         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2329         to_write, writing_pos, size);
2330
2331     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2332       data += to_write;
2333       queue->current->writing_pos += to_write;
2334       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2335     } else {
2336       queue->current->writing_pos = writing_pos = new_writing_pos;
2337 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
2338       if (do_seek) {
2339         if (queue->upstream_size == 0 || new_writing_pos < queue->upstream_size)
2340           perform_seek_to_offset (queue, new_writing_pos);
2341         else
2342           queue->is_eos = TRUE;
2343       }
2344 #endif
2345     }
2346
2347 #ifndef TIZEN_FEATURE_QUEUE2_MODIFICATION
2348     if (do_seek)
2349       perform_seek_to_offset (queue, new_writing_pos);
2350 #endif
2351
2352     update_cur_level (queue, queue->current);
2353
2354     /* update the buffering status */
2355     if (queue->use_buffering) {
2356       GstMessage *msg;
2357       update_buffering (queue);
2358       msg = gst_queue2_get_buffering_message (queue);
2359       if (msg) {
2360         GST_QUEUE2_MUTEX_UNLOCK (queue);
2361         g_mutex_lock (&queue->buffering_post_lock);
2362         gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2363         g_mutex_unlock (&queue->buffering_post_lock);
2364         GST_QUEUE2_MUTEX_LOCK (queue);
2365       }
2366     }
2367
2368     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2369         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2370
2371     GST_QUEUE2_SIGNAL_ADD (queue);
2372   }
2373
2374   gst_buffer_unmap (buffer, &info);
2375
2376   return TRUE;
2377
2378   /* ERRORS */
2379 out_flushing:
2380   {
2381     GST_DEBUG_OBJECT (queue, "we are flushing");
2382     gst_buffer_unmap (buffer, &info);
2383     /* FIXME - GST_FLOW_EOS ? */
2384     return FALSE;
2385   }
2386 seek_failed:
2387   {
2388     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2389     gst_buffer_unmap (buffer, &info);
2390     return FALSE;
2391   }
2392 handle_error:
2393   {
2394     switch (errno) {
2395       case ENOSPC:{
2396         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2397         break;
2398       }
2399       default:{
2400         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2401             (_("Error while writing to download file.")),
2402             ("%s", g_strerror (errno)));
2403       }
2404     }
2405     gst_buffer_unmap (buffer, &info);
2406     return FALSE;
2407   }
2408 buffer_read_error:
2409   {
2410     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2411         ("Can't read from buffer"));
2412     return FALSE;
2413   }
2414 }
2415
2416 static gboolean
2417 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2418 {
2419   GstQueue2 *queue = q;
2420
2421   GST_TRACE_OBJECT (queue,
2422       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2423       gst_buffer_get_size (*buf));
2424
2425   if (!gst_queue2_create_write (queue, *buf)) {
2426     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2427     return FALSE;
2428   }
2429   return TRUE;
2430 }
2431
2432 /* enqueue an item an update the level stats */
2433 static void
2434 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2435     GstQueue2ItemType item_type)
2436 {
2437   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2438     GstBuffer *buffer;
2439     guint size;
2440
2441     buffer = GST_BUFFER_CAST (item);
2442     size = gst_buffer_get_size (buffer);
2443
2444     /* add buffer to the statistics */
2445     if (QUEUE_IS_USING_QUEUE (queue)) {
2446       queue->cur_level.buffers++;
2447       queue->cur_level.bytes += size;
2448     }
2449     queue->bytes_in += size;
2450
2451     /* apply new buffer to segment stats */
2452     apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2453     /* update the byterate stats */
2454     update_in_rates (queue, FALSE);
2455
2456     if (!QUEUE_IS_USING_QUEUE (queue)) {
2457       /* FIXME - check return value? */
2458       gst_queue2_create_write (queue, buffer);
2459     }
2460   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2461     GstBufferList *buffer_list;
2462     guint size;
2463
2464     buffer_list = GST_BUFFER_LIST_CAST (item);
2465
2466     size = gst_buffer_list_calculate_size (buffer_list);
2467     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2468
2469     /* add buffer to the statistics */
2470     if (QUEUE_IS_USING_QUEUE (queue)) {
2471       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2472       queue->cur_level.bytes += size;
2473     }
2474     queue->bytes_in += size;
2475
2476     /* apply new buffer to segment stats */
2477     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2478
2479     /* update the byterate stats */
2480     update_in_rates (queue, FALSE);
2481
2482     if (!QUEUE_IS_USING_QUEUE (queue)) {
2483       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2484     }
2485   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2486     GstEvent *event;
2487
2488     event = GST_EVENT_CAST (item);
2489
2490     switch (GST_EVENT_TYPE (event)) {
2491       case GST_EVENT_EOS:
2492         /* Zero the thresholds, this makes sure the queue is completely
2493          * filled and we can read all data from the queue. */
2494         GST_DEBUG_OBJECT (queue, "we have EOS");
2495         queue->is_eos = TRUE;
2496         /* Force updating the input bitrate */
2497         update_in_rates (queue, TRUE);
2498         break;
2499       case GST_EVENT_SEGMENT:
2500         apply_segment (queue, event, &queue->sink_segment, TRUE);
2501         /* This is our first new segment, we hold it
2502          * as we can't save it on the temp file */
2503         if (!QUEUE_IS_USING_QUEUE (queue)) {
2504           if (queue->segment_event_received)
2505             goto unexpected_event;
2506
2507           queue->segment_event_received = TRUE;
2508           if (queue->starting_segment != NULL)
2509             gst_event_unref (queue->starting_segment);
2510           queue->starting_segment = event;
2511           item = NULL;
2512         }
2513         /* a new segment allows us to accept more buffers if we got EOS
2514          * from downstream */
2515         queue->unexpected = FALSE;
2516         break;
2517       case GST_EVENT_GAP:
2518         apply_gap (queue, event, &queue->sink_segment, TRUE);
2519         break;
2520       case GST_EVENT_STREAM_START:
2521         if (!QUEUE_IS_USING_QUEUE (queue)) {
2522           gst_event_replace (&queue->stream_start_event, event);
2523           gst_event_unref (event);
2524           item = NULL;
2525         }
2526         break;
2527       case GST_EVENT_CAPS:{
2528         GstCaps *caps;
2529
2530         gst_event_parse_caps (event, &caps);
2531         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2532
2533         if (!QUEUE_IS_USING_QUEUE (queue)) {
2534           GST_LOG ("Dropping caps event, not using queue");
2535           gst_event_unref (event);
2536           item = NULL;
2537         }
2538         break;
2539       }
2540       default:
2541         if (!QUEUE_IS_USING_QUEUE (queue))
2542           goto unexpected_event;
2543         break;
2544     }
2545   } else if (GST_IS_QUERY (item)) {
2546     /* Can't happen as we check that in the caller */
2547     if (!QUEUE_IS_USING_QUEUE (queue))
2548       g_assert_not_reached ();
2549   } else {
2550     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2551         item, GST_OBJECT_NAME (queue));
2552     /* we can't really unref since we don't know what it is */
2553     item = NULL;
2554   }
2555
2556   if (item) {
2557     /* update the buffering status */
2558     if (queue->use_buffering)
2559       update_buffering (queue);
2560
2561     if (QUEUE_IS_USING_QUEUE (queue)) {
2562       GstQueue2Item qitem;
2563
2564       qitem.type = item_type;
2565       qitem.item = item;
2566       gst_queue_array_push_tail_struct (queue->queue, &qitem);
2567     } else {
2568       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2569     }
2570
2571     GST_QUEUE2_SIGNAL_ADD (queue);
2572   }
2573
2574   return;
2575
2576   /* ERRORS */
2577 unexpected_event:
2578   {
2579     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2580
2581     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2582         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2583         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2584     gst_event_unref (GST_EVENT_CAST (item));
2585     return;
2586   }
2587 }
2588
2589 /* dequeue an item from the queue and update level stats */
2590 static GstMiniObject *
2591 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2592 {
2593   GstMiniObject *item;
2594
2595   if (!QUEUE_IS_USING_QUEUE (queue)) {
2596     item = gst_queue2_read_item_from_file (queue);
2597   } else {
2598     GstQueue2Item *qitem = gst_queue_array_pop_head_struct (queue->queue);
2599
2600     if (qitem == NULL)
2601       goto no_item;
2602
2603     item = qitem->item;
2604   }
2605
2606   if (item == NULL)
2607     goto no_item;
2608
2609   if (GST_IS_BUFFER (item)) {
2610     GstBuffer *buffer;
2611     guint size;
2612
2613     buffer = GST_BUFFER_CAST (item);
2614     size = gst_buffer_get_size (buffer);
2615     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2616
2617     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2618         "retrieved buffer %p from queue", buffer);
2619
2620     if (QUEUE_IS_USING_QUEUE (queue)) {
2621       queue->cur_level.buffers--;
2622       queue->cur_level.bytes -= size;
2623     }
2624     queue->bytes_out += size;
2625
2626     apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2627     /* update the byterate stats */
2628     update_out_rates (queue);
2629     /* update the buffering */
2630     if (queue->use_buffering)
2631       update_buffering (queue);
2632
2633   } else if (GST_IS_EVENT (item)) {
2634     GstEvent *event = GST_EVENT_CAST (item);
2635
2636     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2637
2638     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2639         "retrieved event %p from queue", event);
2640
2641     switch (GST_EVENT_TYPE (event)) {
2642       case GST_EVENT_EOS:
2643         /* queue is empty now that we dequeued the EOS */
2644         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2645         break;
2646       case GST_EVENT_SEGMENT:
2647         apply_segment (queue, event, &queue->src_segment, FALSE);
2648         break;
2649       case GST_EVENT_GAP:
2650         apply_gap (queue, event, &queue->src_segment, FALSE);
2651         break;
2652       default:
2653         break;
2654     }
2655   } else if (GST_IS_BUFFER_LIST (item)) {
2656     GstBufferList *buffer_list;
2657     guint size;
2658
2659     buffer_list = GST_BUFFER_LIST_CAST (item);
2660     size = gst_buffer_list_calculate_size (buffer_list);
2661     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2662
2663     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2664         "retrieved buffer list %p from queue", buffer_list);
2665
2666     if (QUEUE_IS_USING_QUEUE (queue)) {
2667       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2668       queue->cur_level.bytes -= size;
2669     }
2670     queue->bytes_out += size;
2671
2672     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2673     /* update the byterate stats */
2674     update_out_rates (queue);
2675     /* update the buffering */
2676     if (queue->use_buffering)
2677       update_buffering (queue);
2678   } else if (GST_IS_QUERY (item)) {
2679     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2680         "retrieved query %p from queue", item);
2681     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2682   } else {
2683     g_warning
2684         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2685         item, GST_OBJECT_NAME (queue));
2686     item = NULL;
2687     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2688   }
2689   GST_QUEUE2_SIGNAL_DEL (queue);
2690
2691   return item;
2692
2693   /* ERRORS */
2694 no_item:
2695   {
2696     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2697     return NULL;
2698   }
2699 }
2700
2701 static GstFlowReturn
2702 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2703     GstEvent * event)
2704 {
2705   gboolean ret = TRUE;
2706   GstQueue2 *queue;
2707
2708   queue = GST_QUEUE2 (parent);
2709
2710   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
2711       GST_EVENT_TYPE_NAME (event));
2712
2713   switch (GST_EVENT_TYPE (event)) {
2714     case GST_EVENT_FLUSH_START:
2715     {
2716       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2717         /* forward event */
2718         ret = gst_pad_push_event (queue->srcpad, event);
2719
2720         /* now unblock the chain function */
2721         GST_QUEUE2_MUTEX_LOCK (queue);
2722         queue->srcresult = GST_FLOW_FLUSHING;
2723         queue->sinkresult = GST_FLOW_FLUSHING;
2724         /* unblock the loop and chain functions */
2725         GST_QUEUE2_SIGNAL_ADD (queue);
2726         GST_QUEUE2_SIGNAL_DEL (queue);
2727         GST_QUEUE2_MUTEX_UNLOCK (queue);
2728
2729         /* make sure it pauses, this should happen since we sent
2730          * flush_start downstream. */
2731         gst_pad_pause_task (queue->srcpad);
2732         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2733
2734         GST_QUEUE2_MUTEX_LOCK (queue);
2735         queue->last_query = FALSE;
2736         g_cond_signal (&queue->query_handled);
2737         GST_QUEUE2_MUTEX_UNLOCK (queue);
2738       } else {
2739         GST_QUEUE2_MUTEX_LOCK (queue);
2740         /* flush the sink pad */
2741         queue->sinkresult = GST_FLOW_FLUSHING;
2742         GST_QUEUE2_SIGNAL_DEL (queue);
2743         queue->last_query = FALSE;
2744         g_cond_signal (&queue->query_handled);
2745         GST_QUEUE2_MUTEX_UNLOCK (queue);
2746
2747         gst_event_unref (event);
2748       }
2749       break;
2750     }
2751     case GST_EVENT_FLUSH_STOP:
2752     {
2753       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2754         /* forward event */
2755         ret = gst_pad_push_event (queue->srcpad, event);
2756
2757         GST_QUEUE2_MUTEX_LOCK (queue);
2758         gst_queue2_locked_flush (queue, FALSE, TRUE);
2759         queue->srcresult = GST_FLOW_OK;
2760         queue->sinkresult = GST_FLOW_OK;
2761         queue->is_eos = FALSE;
2762         queue->unexpected = FALSE;
2763         queue->seeking = FALSE;
2764         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2765         /* reset rate counters */
2766         reset_rate_timer (queue);
2767         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2768             queue->srcpad, NULL);
2769         GST_QUEUE2_MUTEX_UNLOCK (queue);
2770       } else {
2771         GST_QUEUE2_MUTEX_LOCK (queue);
2772         queue->segment_event_received = FALSE;
2773         queue->is_eos = FALSE;
2774         queue->unexpected = FALSE;
2775         queue->sinkresult = GST_FLOW_OK;
2776         queue->seeking = FALSE;
2777         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2778         GST_QUEUE2_MUTEX_UNLOCK (queue);
2779
2780         gst_event_unref (event);
2781       }
2782       g_object_notify (G_OBJECT (queue), "bitrate");
2783       break;
2784     }
2785     case GST_EVENT_TAG:{
2786       if (queue->use_tags_bitrate) {
2787         GstTagList *tags;
2788         guint bitrate;
2789
2790         gst_event_parse_tag (event, &tags);
2791         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2792             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2793           GST_QUEUE2_MUTEX_LOCK (queue);
2794           queue->sink_tags_bitrate = bitrate;
2795           GST_QUEUE2_MUTEX_UNLOCK (queue);
2796           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2797           g_object_notify (G_OBJECT (queue), "bitrate");
2798         }
2799       }
2800       /* Fall-through */
2801     }
2802     default:
2803       if (GST_EVENT_IS_SERIALIZED (event)) {
2804         gboolean bitrate_changed = TRUE;
2805         /* serialized events go in the queue */
2806
2807         /* STREAM_START and SEGMENT reset the EOS status of a
2808          * pad. Change the cached sinkpad flow result accordingly */
2809         if (queue->sinkresult == GST_FLOW_EOS
2810             && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
2811                 || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
2812           queue->sinkresult = GST_FLOW_OK;
2813
2814         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2815         if (queue->srcresult != GST_FLOW_OK) {
2816           /* Errors in sticky event pushing are no problem and ignored here
2817            * as they will cause more meaningful errors during data flow.
2818            * For EOS events, that are not followed by data flow, we still
2819            * return FALSE here though and report an error.
2820            */
2821           if (!GST_EVENT_IS_STICKY (event)) {
2822             goto out_flow_error;
2823           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2824             if (queue->srcresult == GST_FLOW_NOT_LINKED
2825                 || queue->srcresult < GST_FLOW_EOS) {
2826               GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
2827             }
2828             goto out_flow_error;
2829           }
2830         }
2831
2832         /* refuse more events on EOS unless they unset the EOS status */
2833         if (queue->is_eos) {
2834           switch (GST_EVENT_TYPE (event)) {
2835             case GST_EVENT_STREAM_START:
2836             case GST_EVENT_SEGMENT:
2837               /* Restart the loop */
2838               if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2839                 queue->srcresult = GST_FLOW_OK;
2840                 queue->is_eos = FALSE;
2841                 queue->unexpected = FALSE;
2842                 queue->seeking = FALSE;
2843                 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2844                 /* reset rate counters */
2845                 reset_rate_timer (queue);
2846                 gst_pad_start_task (queue->srcpad,
2847                     (GstTaskFunction) gst_queue2_loop, queue->srcpad, NULL);
2848               } else {
2849                 queue->is_eos = FALSE;
2850                 queue->unexpected = FALSE;
2851                 queue->seeking = FALSE;
2852                 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2853               }
2854               bitrate_changed = TRUE;
2855
2856               break;
2857             default:
2858               goto out_eos;
2859           }
2860         }
2861
2862         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2863         GST_QUEUE2_MUTEX_UNLOCK (queue);
2864         gst_queue2_post_buffering (queue);
2865         if (bitrate_changed)
2866           g_object_notify (G_OBJECT (queue), "bitrate");
2867       } else {
2868         /* non-serialized events are passed downstream. */
2869         ret = gst_pad_push_event (queue->srcpad, event);
2870       }
2871       break;
2872   }
2873   if (ret == FALSE)
2874     return GST_FLOW_ERROR;
2875   return GST_FLOW_OK;
2876
2877   /* ERRORS */
2878 out_flushing:
2879   {
2880     GstFlowReturn ret = queue->sinkresult;
2881     GST_DEBUG_OBJECT (queue, "refusing event, we are %s",
2882         gst_flow_get_name (ret));
2883     GST_QUEUE2_MUTEX_UNLOCK (queue);
2884     gst_event_unref (event);
2885     return ret;
2886   }
2887 out_eos:
2888   {
2889     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2890     GST_QUEUE2_MUTEX_UNLOCK (queue);
2891     gst_event_unref (event);
2892     return GST_FLOW_EOS;
2893   }
2894 out_flow_error:
2895   {
2896     GST_LOG_OBJECT (queue,
2897         "refusing event, we have a downstream flow error: %s",
2898         gst_flow_get_name (queue->srcresult));
2899     GST_QUEUE2_MUTEX_UNLOCK (queue);
2900     gst_event_unref (event);
2901     return queue->srcresult;
2902   }
2903 }
2904
2905 static gboolean
2906 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2907     GstQuery * query)
2908 {
2909   GstQueue2 *queue;
2910   gboolean res;
2911
2912   queue = GST_QUEUE2 (parent);
2913
2914   switch (GST_QUERY_TYPE (query)) {
2915     default:
2916       if (GST_QUERY_IS_SERIALIZED (query)) {
2917         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2918             "received query %" GST_PTR_FORMAT, query);
2919         /* serialized events go in the queue. We need to be certain that we
2920          * don't cause deadlocks waiting for the query return value. We check if
2921          * the queue is empty (nothing is blocking downstream and the query can
2922          * be pushed for sure) or we are not buffering. If we are buffering,
2923          * the pipeline waits to unblock downstream until our queue fills up
2924          * completely, which can not happen if we block on the query..
2925          * Therefore we only potentially block when we are not buffering. */
2926         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2927         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2928                 || !queue->use_buffering)) {
2929           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2930             gst_queue2_locked_enqueue (queue, query,
2931                 GST_QUEUE2_ITEM_TYPE_QUERY);
2932
2933             STATUS (queue, queue->sinkpad, "wait for QUERY");
2934             while (queue->sinkresult == GST_FLOW_OK &&
2935                 queue->last_handled_query != query)
2936               g_cond_wait (&queue->query_handled, &queue->qlock);
2937             queue->last_handled_query = NULL;
2938             if (queue->sinkresult != GST_FLOW_OK)
2939               goto out_flushing;
2940             res = queue->last_query;
2941           } else {
2942             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2943             res = FALSE;
2944           }
2945         } else {
2946           GST_DEBUG_OBJECT (queue,
2947               "refusing query, we are not using the queue");
2948           res = FALSE;
2949         }
2950         GST_QUEUE2_MUTEX_UNLOCK (queue);
2951         gst_queue2_post_buffering (queue);
2952       } else {
2953         res = gst_pad_query_default (pad, parent, query);
2954       }
2955       break;
2956   }
2957   return res;
2958
2959   /* ERRORS */
2960 out_flushing:
2961   {
2962     GST_DEBUG_OBJECT (queue, "refusing query, we are %s",
2963         gst_flow_get_name (queue->sinkresult));
2964     GST_QUEUE2_MUTEX_UNLOCK (queue);
2965     return FALSE;
2966   }
2967 }
2968
2969 static gboolean
2970 gst_queue2_is_empty (GstQueue2 * queue)
2971 {
2972   /* never empty on EOS */
2973   if (queue->is_eos)
2974     return FALSE;
2975
2976   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2977     return queue->current->writing_pos <= queue->current->max_reading_pos;
2978   } else {
2979     if (gst_queue_array_get_length (queue->queue) == 0)
2980       return TRUE;
2981   }
2982
2983   return FALSE;
2984 }
2985
2986 static gboolean
2987 gst_queue2_is_filled (GstQueue2 * queue)
2988 {
2989   gboolean res;
2990
2991   /* always filled on EOS */
2992   if (queue->is_eos)
2993     return TRUE;
2994
2995 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2996     (queue->cur_level.format) >= ((alt_max) ? \
2997       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2998
2999   /* if using a ring buffer we're filled if all ring buffer space is used
3000    * _by the current range_ */
3001   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
3002     guint64 rb_size = queue->ring_buffer_max_size;
3003     GST_DEBUG_OBJECT (queue,
3004         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
3005         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
3006     return CHECK_FILLED (bytes, rb_size);
3007   }
3008
3009   /* if using file, we're never filled if we don't have EOS */
3010   if (QUEUE_IS_USING_TEMP_FILE (queue))
3011     return FALSE;
3012
3013   /* we are never filled when we have no buffers at all */
3014   if (queue->cur_level.buffers == 0)
3015     return FALSE;
3016
3017   /* we are filled if one of the current levels exceeds the max */
3018   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
3019       || CHECK_FILLED (time, 0);
3020
3021   /* if we need to, use the rate estimate to check against the max time we are
3022    * allowed to queue */
3023   if (queue->use_rate_estimate)
3024     res |= CHECK_FILLED (rate_time, 0);
3025
3026 #undef CHECK_FILLED
3027   return res;
3028 }
3029
3030 static GstFlowReturn
3031 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
3032     GstMiniObject * item, GstQueue2ItemType item_type)
3033 {
3034   /* we have to lock the queue since we span threads */
3035   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
3036   /* when we received EOS, we refuse more data */
3037   if (queue->is_eos)
3038     goto out_eos;
3039   /* when we received unexpected from downstream, refuse more buffers */
3040   if (queue->unexpected)
3041     goto out_unexpected;
3042
3043   /* while we didn't receive the newsegment, we're seeking and we skip data */
3044   if (queue->seeking)
3045     goto out_seeking;
3046
3047   if (!gst_queue2_wait_free_space (queue))
3048     goto out_flushing;
3049
3050   /* put buffer in queue now */
3051   gst_queue2_locked_enqueue (queue, item, item_type);
3052   GST_QUEUE2_MUTEX_UNLOCK (queue);
3053   gst_queue2_post_buffering (queue);
3054
3055   return GST_FLOW_OK;
3056
3057   /* special conditions */
3058 out_flushing:
3059   {
3060     GstFlowReturn ret = queue->sinkresult;
3061
3062     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3063         "exit because task paused, reason: %s", gst_flow_get_name (ret));
3064     GST_QUEUE2_MUTEX_UNLOCK (queue);
3065     gst_mini_object_unref (item);
3066
3067     return ret;
3068   }
3069 out_eos:
3070   {
3071     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
3072     GST_QUEUE2_MUTEX_UNLOCK (queue);
3073     gst_mini_object_unref (item);
3074
3075     return GST_FLOW_EOS;
3076   }
3077 out_seeking:
3078   {
3079     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
3080     GST_QUEUE2_MUTEX_UNLOCK (queue);
3081     gst_mini_object_unref (item);
3082
3083     return GST_FLOW_OK;
3084   }
3085 out_unexpected:
3086   {
3087     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
3088     GST_QUEUE2_MUTEX_UNLOCK (queue);
3089     gst_mini_object_unref (item);
3090
3091     return GST_FLOW_EOS;
3092   }
3093 }
3094
3095 static GstFlowReturn
3096 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
3097 {
3098   GstQueue2 *queue;
3099
3100   queue = GST_QUEUE2 (parent);
3101
3102   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
3103       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
3104       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
3105       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
3106       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
3107
3108   return gst_queue2_chain_buffer_or_buffer_list (queue,
3109       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
3110 }
3111
3112 static GstFlowReturn
3113 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
3114     GstBufferList * buffer_list)
3115 {
3116   GstQueue2 *queue;
3117
3118   queue = GST_QUEUE2 (parent);
3119
3120   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3121       "received buffer list %p", buffer_list);
3122
3123   return gst_queue2_chain_buffer_or_buffer_list (queue,
3124       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
3125 }
3126
3127 static GstMiniObject *
3128 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
3129 {
3130   GstMiniObject *data;
3131
3132   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
3133
3134   /* stop pushing buffers, we dequeue all items until we see an item that we
3135    * can push again, which is EOS or SEGMENT. If there is nothing in the
3136    * queue we can push, we set a flag to make the sinkpad refuse more
3137    * buffers with an EOS return value until we receive something
3138    * pushable again or we get flushed. */
3139   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
3140     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
3141       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3142           "dropping EOS buffer %p", data);
3143       gst_buffer_unref (GST_BUFFER_CAST (data));
3144     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
3145       GstEvent *event = GST_EVENT_CAST (data);
3146       GstEventType type = GST_EVENT_TYPE (event);
3147
3148       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
3149           || type == GST_EVENT_STREAM_START) {
3150         /* we found a pushable item in the queue, push it out */
3151         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3152             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
3153         return data;
3154       }
3155       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3156           "dropping EOS event %p", event);
3157       gst_event_unref (event);
3158     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
3159       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3160           "dropping EOS buffer list %p", data);
3161       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
3162     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
3163       queue->last_query = FALSE;
3164       g_cond_signal (&queue->query_handled);
3165       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
3166     }
3167   }
3168   /* no more items in the queue. Set the unexpected flag so that upstream
3169    * make us refuse any more buffers on the sinkpad. Since we will still
3170    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
3171    * task function does not shut down. */
3172   queue->unexpected = TRUE;
3173   return NULL;
3174 }
3175
3176 /* dequeue an item from the queue an push it downstream. This functions returns
3177  * the result of the push. */
3178 static GstFlowReturn
3179 gst_queue2_push_one (GstQueue2 * queue)
3180 {
3181   GstFlowReturn result;
3182   GstMiniObject *data;
3183   GstQueue2ItemType item_type;
3184
3185   data = gst_queue2_locked_dequeue (queue, &item_type);
3186   if (data == NULL)
3187     goto no_item;
3188
3189 next:
3190   result = queue->srcresult;
3191   STATUS (queue, queue->srcpad, "We have something dequeud");
3192   g_atomic_int_set (&queue->downstream_may_block,
3193       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
3194       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
3195   GST_QUEUE2_MUTEX_UNLOCK (queue);
3196   gst_queue2_post_buffering (queue);
3197
3198   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
3199     GstBuffer *buffer;
3200
3201     buffer = GST_BUFFER_CAST (data);
3202
3203     result = gst_pad_push (queue->srcpad, buffer);
3204     g_atomic_int_set (&queue->downstream_may_block, 0);
3205
3206     /* need to check for srcresult here as well */
3207     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3208     if (result == GST_FLOW_EOS) {
3209       data = gst_queue2_dequeue_on_eos (queue, &item_type);
3210       if (data != NULL)
3211         goto next;
3212       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
3213        * to the caller so that the task function does not shut down */
3214       result = GST_FLOW_OK;
3215     }
3216   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
3217     GstEvent *event = GST_EVENT_CAST (data);
3218     GstEventType type = GST_EVENT_TYPE (event);
3219
3220     if (type == GST_EVENT_TAG) {
3221       if (queue->use_tags_bitrate) {
3222         GstTagList *tags;
3223         guint bitrate;
3224
3225         gst_event_parse_tag (event, &tags);
3226         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
3227             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
3228           GST_QUEUE2_MUTEX_LOCK (queue);
3229           queue->src_tags_bitrate = bitrate;
3230           GST_QUEUE2_MUTEX_UNLOCK (queue);
3231           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
3232           g_object_notify (G_OBJECT (queue), "bitrate");
3233         }
3234       }
3235     }
3236
3237     gst_pad_push_event (queue->srcpad, event);
3238
3239     /* if we're EOS, return EOS so that the task pauses. */
3240     if (type == GST_EVENT_EOS) {
3241       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3242           "pushed EOS event %p, return EOS", event);
3243       result = GST_FLOW_EOS;
3244     }
3245
3246     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3247   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
3248     GstBufferList *buffer_list;
3249
3250     buffer_list = GST_BUFFER_LIST_CAST (data);
3251
3252     result = gst_pad_push_list (queue->srcpad, buffer_list);
3253     g_atomic_int_set (&queue->downstream_may_block, 0);
3254
3255     /* need to check for srcresult here as well */
3256     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3257     if (result == GST_FLOW_EOS) {
3258       data = gst_queue2_dequeue_on_eos (queue, &item_type);
3259       if (data != NULL)
3260         goto next;
3261       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
3262        * to the caller so that the task function does not shut down */
3263       result = GST_FLOW_OK;
3264     }
3265   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
3266     GstQuery *query = GST_QUERY_CAST (data);
3267
3268     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
3269     queue->last_handled_query = query;
3270     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
3271     GST_LOG_OBJECT (queue->srcpad, "Peered query");
3272     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3273         "did query %p, return %d", query, queue->last_query);
3274     g_cond_signal (&queue->query_handled);
3275     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3276     result = GST_FLOW_OK;
3277   }
3278   return result;
3279
3280   /* ERRORS */
3281 no_item:
3282   {
3283     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3284         "exit because we have no item in the queue");
3285     return GST_FLOW_ERROR;
3286   }
3287 out_flushing:
3288   {
3289     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are %s",
3290         gst_flow_get_name (queue->srcresult));
3291     return queue->srcresult;
3292   }
3293 }
3294
3295 /* called repeatedly with @pad as the source pad. This function should push out
3296  * data to the peer element. */
3297 static void
3298 gst_queue2_loop (GstPad * pad)
3299 {
3300   GstQueue2 *queue;
3301 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
3302   GstFlowReturn ret = GST_FLOW_OK;
3303 #else
3304   GstFlowReturn ret;
3305 #endif
3306
3307   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
3308
3309   /* have to lock for thread-safety */
3310   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3311
3312   if (gst_queue2_is_empty (queue)) {
3313     gboolean started;
3314
3315     /* pause the timer while we wait. The fact that we are waiting does not mean
3316      * the byterate on the output pad is lower */
3317     if ((started = queue->out_timer_started))
3318       g_timer_stop (queue->out_timer);
3319
3320     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
3321         "queue is empty, waiting for new data");
3322     do {
3323       /* Wait for data to be available, we could be unlocked because of a flush. */
3324       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
3325     }
3326     while (gst_queue2_is_empty (queue));
3327
3328     /* and continue if we were running before */
3329     if (started)
3330       g_timer_continue (queue->out_timer);
3331   }
3332 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
3333   /* if buffering mode is GST_BUFFERING_LIVE, it is rtsp streaming */
3334   if (!((queue->mode == GST_BUFFERING_LIVE) && queue->is_buffering)) {
3335     ret = gst_queue2_push_one (queue);
3336   }
3337 #else
3338   ret = gst_queue2_push_one (queue);
3339 #endif
3340   queue->srcresult = ret;
3341   queue->sinkresult = ret;
3342   if (ret != GST_FLOW_OK)
3343     goto out_flushing;
3344
3345   GST_QUEUE2_MUTEX_UNLOCK (queue);
3346   gst_queue2_post_buffering (queue);
3347
3348   return;
3349
3350   /* ERRORS */
3351 out_flushing:
3352   {
3353     gboolean eos = queue->is_eos;
3354     GstFlowReturn ret = queue->srcresult;
3355
3356     gst_pad_pause_task (queue->srcpad);
3357     if (ret == GST_FLOW_FLUSHING) {
3358       gst_queue2_locked_flush (queue, FALSE, FALSE);
3359     } else {
3360       GST_QUEUE2_SIGNAL_DEL (queue);
3361       queue->last_query = FALSE;
3362       g_cond_signal (&queue->query_handled);
3363     }
3364     GST_QUEUE2_MUTEX_UNLOCK (queue);
3365     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3366         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
3367     /* Recalculate buffering levels before stopping since the source flow
3368      * might cause a different buffering level (like NOT_LINKED making
3369      * the queue appear as full) */
3370     if (queue->use_buffering)
3371       update_buffering (queue);
3372     gst_queue2_post_buffering (queue);
3373     /* let app know about us giving up if upstream is not expected to do so */
3374     /* EOS is already taken care of elsewhere */
3375     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3376       GST_ELEMENT_FLOW_ERROR (queue, ret);
3377       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3378     }
3379     return;
3380   }
3381 }
3382
3383 static gboolean
3384 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3385 {
3386   gboolean res = TRUE;
3387   GstQueue2 *queue = GST_QUEUE2 (parent);
3388
3389 #ifndef GST_DISABLE_GST_DEBUG
3390   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3391       event, GST_EVENT_TYPE_NAME (event));
3392 #endif
3393
3394   switch (GST_EVENT_TYPE (event)) {
3395     case GST_EVENT_FLUSH_START:
3396       if (QUEUE_IS_USING_QUEUE (queue)) {
3397         /* just forward upstream */
3398         res = gst_pad_push_event (queue->sinkpad, event);
3399       } else {
3400         /* now unblock the getrange function */
3401         GST_QUEUE2_MUTEX_LOCK (queue);
3402         GST_DEBUG_OBJECT (queue, "flushing");
3403         queue->srcresult = GST_FLOW_FLUSHING;
3404         GST_QUEUE2_SIGNAL_ADD (queue);
3405         GST_QUEUE2_MUTEX_UNLOCK (queue);
3406
3407         /* when using a temp file, we eat the event */
3408         res = TRUE;
3409         gst_event_unref (event);
3410       }
3411       break;
3412     case GST_EVENT_FLUSH_STOP:
3413       if (QUEUE_IS_USING_QUEUE (queue)) {
3414         /* just forward upstream */
3415         res = gst_pad_push_event (queue->sinkpad, event);
3416       } else {
3417         /* now unblock the getrange function */
3418         GST_QUEUE2_MUTEX_LOCK (queue);
3419         queue->srcresult = GST_FLOW_OK;
3420         GST_QUEUE2_MUTEX_UNLOCK (queue);
3421
3422         /* when using a temp file, we eat the event */
3423         res = TRUE;
3424         gst_event_unref (event);
3425       }
3426       break;
3427     case GST_EVENT_RECONFIGURE:
3428       GST_QUEUE2_MUTEX_LOCK (queue);
3429       /* assume downstream is linked now and try to push again */
3430       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3431         queue->srcresult = GST_FLOW_OK;
3432         queue->sinkresult = GST_FLOW_OK;
3433         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3434           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3435               NULL);
3436         }
3437
3438       }
3439       GST_QUEUE2_MUTEX_UNLOCK (queue);
3440
3441       /* force a new bitrate query to be performed */
3442       query_downstream_bitrate (queue);
3443
3444       res = gst_pad_push_event (queue->sinkpad, event);
3445       break;
3446     default:
3447       res = gst_pad_push_event (queue->sinkpad, event);
3448       break;
3449   }
3450
3451   return res;
3452 }
3453
3454 static gboolean
3455 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3456 {
3457   GstQueue2 *queue;
3458
3459   queue = GST_QUEUE2 (parent);
3460
3461   switch (GST_QUERY_TYPE (query)) {
3462     case GST_QUERY_POSITION:
3463     {
3464       gint64 peer_pos;
3465       GstFormat format;
3466
3467       if (!gst_pad_peer_query (queue->sinkpad, query))
3468         goto peer_failed;
3469
3470       /* get peer position */
3471       gst_query_parse_position (query, &format, &peer_pos);
3472
3473       /* FIXME: this code assumes that there's no discont in the queue */
3474       switch (format) {
3475         case GST_FORMAT_BYTES:
3476           peer_pos -= queue->cur_level.bytes;
3477           if (peer_pos < 0)     /* Clamp result to 0 */
3478             peer_pos = 0;
3479           break;
3480         case GST_FORMAT_TIME:
3481           peer_pos -= queue->cur_level.time;
3482           if (peer_pos < 0)     /* Clamp result to 0 */
3483             peer_pos = 0;
3484           break;
3485         default:
3486           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3487               "know how to adjust value", gst_format_get_name (format));
3488           return FALSE;
3489       }
3490       /* set updated position */
3491       gst_query_set_position (query, format, peer_pos);
3492       break;
3493     }
3494     case GST_QUERY_DURATION:
3495     {
3496       GST_DEBUG_OBJECT (queue, "doing peer query");
3497
3498       if (!gst_pad_peer_query (queue->sinkpad, query))
3499         goto peer_failed;
3500
3501       GST_DEBUG_OBJECT (queue, "peer query success");
3502       break;
3503     }
3504     case GST_QUERY_BUFFERING:
3505     {
3506       gint percent;
3507       gboolean is_buffering;
3508       GstBufferingMode mode;
3509       gint avg_in, avg_out;
3510       gint64 buffering_left;
3511
3512       GST_DEBUG_OBJECT (queue, "query buffering");
3513
3514       get_buffering_level (queue, &is_buffering, &percent);
3515       percent = convert_to_buffering_percent (queue, percent);
3516       gst_query_set_buffering_percent (query, is_buffering, percent);
3517
3518       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3519           &buffering_left);
3520       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3521           buffering_left);
3522
3523       if (!QUEUE_IS_USING_QUEUE (queue)) {
3524         /* add ranges for download and ringbuffer buffering */
3525         GstFormat format;
3526         gint64 start, stop, range_start, range_stop;
3527         guint64 writing_pos;
3528         gint64 estimated_total;
3529         gint64 duration;
3530         gboolean peer_res, is_eos;
3531         GstQueue2Range *queued_ranges;
3532
3533         /* we need a current download region */
3534         if (queue->current == NULL)
3535           return FALSE;
3536
3537         writing_pos = queue->current->writing_pos;
3538         is_eos = queue->is_eos;
3539
3540         if (is_eos) {
3541           /* we're EOS, we know the duration in bytes now */
3542           peer_res = TRUE;
3543           duration = writing_pos;
3544         } else {
3545           /* get duration of upstream in bytes */
3546           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3547               GST_FORMAT_BYTES, &duration);
3548         }
3549
3550         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3551             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3552
3553         /* calculate remaining and total download time */
3554         if (peer_res && avg_in > 0.0)
3555           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3556         else
3557           estimated_total = -1;
3558
3559         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3560             estimated_total);
3561
3562         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3563
3564         switch (format) {
3565           case GST_FORMAT_PERCENT:
3566             /* we need duration */
3567             if (!peer_res)
3568               goto peer_failed;
3569
3570             start = 0;
3571             /* get our available data relative to the duration */
3572             if (duration != -1)
3573               stop =
3574                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3575                   duration);
3576             else
3577               stop = -1;
3578             break;
3579           case GST_FORMAT_BYTES:
3580             start = 0;
3581             stop = writing_pos;
3582             break;
3583           default:
3584             start = -1;
3585             stop = -1;
3586             break;
3587         }
3588
3589         /* fill out the buffered ranges */
3590         for (queued_ranges = queue->ranges; queued_ranges;
3591             queued_ranges = queued_ranges->next) {
3592           switch (format) {
3593             case GST_FORMAT_PERCENT:
3594               if (duration == -1) {
3595                 range_start = 0;
3596                 range_stop = 0;
3597                 break;
3598               }
3599 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3600               range_start =
3601                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3602                   queued_ranges->reading_pos, duration);
3603 #else
3604               range_start =
3605                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3606                   queued_ranges->offset, duration);
3607 #endif
3608               range_stop =
3609                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3610                   queued_ranges->writing_pos, duration);
3611               break;
3612             case GST_FORMAT_BYTES:
3613 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3614               range_start = queued_ranges->reading_pos;
3615 #else
3616               range_start = queued_ranges->offset;
3617 #endif
3618               range_stop = queued_ranges->writing_pos;
3619               break;
3620             default:
3621               range_start = -1;
3622               range_stop = -1;
3623               break;
3624           }
3625           if (range_start == range_stop)
3626             continue;
3627           GST_DEBUG_OBJECT (queue,
3628               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3629               G_GINT64_FORMAT, range_start, range_stop);
3630           gst_query_add_buffering_range (query, range_start, range_stop);
3631         }
3632
3633         gst_query_set_buffering_range (query, format, start, stop,
3634             estimated_total);
3635       }
3636       break;
3637     }
3638     case GST_QUERY_SCHEDULING:
3639     {
3640       gboolean pull_mode;
3641       GstSchedulingFlags flags = 0;
3642 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3643       if ((!QUEUE_IS_USING_QUEUE (queue)) && !gst_pad_peer_query (queue->sinkpad, query))
3644 #else
3645       if (!gst_pad_peer_query (queue->sinkpad, query))
3646 #endif
3647         goto peer_failed;
3648
3649       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3650
3651 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3652       if (!(flags & GST_SCHEDULING_FLAG_SEEKABLE)) {
3653         GST_DEBUG_OBJECT(queue, "peer can support only push mode");
3654         gst_query_set_scheduling (query, flags, 0, -1, 0);
3655         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3656         break;
3657       }
3658 #endif
3659       GST_DEBUG_OBJECT(queue, "peer can support pull mode");
3660
3661       /* we can operate in pull mode when we are using a tempfile */
3662       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3663
3664       if (pull_mode)
3665         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3666       gst_query_set_scheduling (query, flags, 0, -1, 0);
3667       if (pull_mode)
3668         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3669       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3670       break;
3671     }
3672     default:
3673       /* peer handled other queries */
3674       if (!gst_pad_query_default (pad, parent, query))
3675         goto peer_failed;
3676       break;
3677   }
3678
3679   return TRUE;
3680
3681   /* ERRORS */
3682 peer_failed:
3683   {
3684     GST_DEBUG_OBJECT (queue, "failed peer query");
3685     return FALSE;
3686   }
3687 }
3688
3689 static gboolean
3690 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3691 {
3692   GstQueue2 *queue = GST_QUEUE2 (element);
3693
3694   /* simply forward to the srcpad query function */
3695   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3696       query);
3697 }
3698
3699 static void
3700 gst_queue2_update_upstream_size (GstQueue2 * queue)
3701 {
3702   gint64 upstream_size = -1;
3703
3704   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3705           &upstream_size)) {
3706     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3707
3708     /* upstream_size can be negative but queue->upstream_size is unsigned.
3709      * Prevent setting negative values to it (the query can return -1) */
3710     if (upstream_size >= 0)
3711       queue->upstream_size = upstream_size;
3712     else
3713       queue->upstream_size = 0;
3714   }
3715 }
3716
3717 static GstFlowReturn
3718 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3719     guint length, GstBuffer ** buffer)
3720 {
3721   GstQueue2 *queue;
3722   GstFlowReturn ret;
3723
3724   queue = GST_QUEUE2_CAST (parent);
3725
3726   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3727   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3728   offset = (offset == -1) ? queue->current->reading_pos : offset;
3729
3730   GST_DEBUG_OBJECT (queue,
3731       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3732
3733   /* catch any reads beyond the size of the file here to make sure queue2
3734    * doesn't send seek events beyond the size of the file upstream, since
3735    * that would confuse elements such as souphttpsrc and/or http servers.
3736    * Demuxers often just loop until EOS at the end of the file to figure out
3737    * when they've read all the end-headers or index chunks. */
3738   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3739     gst_queue2_update_upstream_size (queue);
3740     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3741       goto out_unexpected;
3742   }
3743
3744   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3745     gst_queue2_update_upstream_size (queue);
3746     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3747       length = queue->upstream_size - offset;
3748       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3749     }
3750   }
3751
3752   /* FIXME - function will block when the range is not yet available */
3753   ret = gst_queue2_create_read (queue, offset, length, buffer);
3754   GST_QUEUE2_MUTEX_UNLOCK (queue);
3755   gst_queue2_post_buffering (queue);
3756
3757   return ret;
3758
3759   /* ERRORS */
3760 out_flushing:
3761   {
3762     ret = queue->srcresult;
3763
3764     GST_DEBUG_OBJECT (queue, "we are %s", gst_flow_get_name (ret));
3765     GST_QUEUE2_MUTEX_UNLOCK (queue);
3766     return ret;
3767   }
3768 out_unexpected:
3769   {
3770     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3771     GST_QUEUE2_MUTEX_UNLOCK (queue);
3772     return GST_FLOW_EOS;
3773   }
3774 }
3775
3776 /* sink currently only operates in push mode */
3777 static gboolean
3778 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3779     GstPadMode mode, gboolean active)
3780 {
3781   gboolean result;
3782   GstQueue2 *queue;
3783
3784   queue = GST_QUEUE2 (parent);
3785
3786   switch (mode) {
3787     case GST_PAD_MODE_PUSH:
3788       if (active) {
3789         GST_QUEUE2_MUTEX_LOCK (queue);
3790         GST_DEBUG_OBJECT (queue, "activating push mode");
3791         queue->srcresult = GST_FLOW_OK;
3792         queue->sinkresult = GST_FLOW_OK;
3793         queue->is_eos = FALSE;
3794         queue->unexpected = FALSE;
3795         reset_rate_timer (queue);
3796         GST_QUEUE2_MUTEX_UNLOCK (queue);
3797       } else {
3798         /* unblock chain function */
3799         GST_QUEUE2_MUTEX_LOCK (queue);
3800         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3801         queue->srcresult = GST_FLOW_FLUSHING;
3802         queue->sinkresult = GST_FLOW_FLUSHING;
3803         GST_QUEUE2_SIGNAL_DEL (queue);
3804         GST_QUEUE2_MUTEX_UNLOCK (queue);
3805
3806         /* wait until it is unblocked and clean up */
3807         GST_PAD_STREAM_LOCK (pad);
3808         GST_QUEUE2_MUTEX_LOCK (queue);
3809         gst_queue2_locked_flush (queue, TRUE, FALSE);
3810         GST_QUEUE2_MUTEX_UNLOCK (queue);
3811         GST_PAD_STREAM_UNLOCK (pad);
3812       }
3813       result = TRUE;
3814       break;
3815     default:
3816       result = FALSE;
3817       break;
3818   }
3819   return result;
3820 }
3821
3822 /* src operating in push mode, we start a task on the source pad that pushes out
3823  * buffers from the queue */
3824 static gboolean
3825 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3826 {
3827   gboolean result = FALSE;
3828   GstQueue2 *queue;
3829
3830   queue = GST_QUEUE2 (parent);
3831
3832   if (active) {
3833     GST_QUEUE2_MUTEX_LOCK (queue);
3834     GST_DEBUG_OBJECT (queue, "activating push mode");
3835     queue->srcresult = GST_FLOW_OK;
3836     queue->sinkresult = GST_FLOW_OK;
3837     queue->is_eos = FALSE;
3838     queue->unexpected = FALSE;
3839     result =
3840         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3841     GST_QUEUE2_MUTEX_UNLOCK (queue);
3842   } else {
3843     /* unblock loop function */
3844     GST_QUEUE2_MUTEX_LOCK (queue);
3845     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3846     queue->srcresult = GST_FLOW_FLUSHING;
3847     queue->sinkresult = GST_FLOW_FLUSHING;
3848     /* the item add signal will unblock */
3849     GST_QUEUE2_SIGNAL_ADD (queue);
3850     GST_QUEUE2_MUTEX_UNLOCK (queue);
3851
3852     /* step 2, make sure streaming finishes */
3853     result = gst_pad_stop_task (pad);
3854
3855     GST_QUEUE2_MUTEX_LOCK (queue);
3856     gst_queue2_locked_flush (queue, FALSE, FALSE);
3857     GST_QUEUE2_MUTEX_UNLOCK (queue);
3858   }
3859
3860   return result;
3861 }
3862
3863 /* pull mode, downstream will call our getrange function */
3864 static gboolean
3865 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3866 {
3867   gboolean result;
3868   GstQueue2 *queue;
3869
3870   queue = GST_QUEUE2 (parent);
3871
3872   if (active) {
3873     GST_QUEUE2_MUTEX_LOCK (queue);
3874     if (!QUEUE_IS_USING_QUEUE (queue)) {
3875       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3876         /* open the temp file now */
3877         result = gst_queue2_open_temp_location_file (queue);
3878       } else if (!queue->ring_buffer) {
3879         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3880         result = ! !queue->ring_buffer;
3881       } else {
3882         result = TRUE;
3883       }
3884
3885       GST_DEBUG_OBJECT (queue, "activating pull mode");
3886       init_ranges (queue);
3887       queue->srcresult = GST_FLOW_OK;
3888       queue->sinkresult = GST_FLOW_OK;
3889       queue->is_eos = FALSE;
3890       queue->unexpected = FALSE;
3891       queue->upstream_size = 0;
3892     } else {
3893       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3894       /* this is not allowed, we cannot operate in pull mode without a temp
3895        * file. */
3896       queue->srcresult = GST_FLOW_FLUSHING;
3897       queue->sinkresult = GST_FLOW_FLUSHING;
3898       result = FALSE;
3899     }
3900     GST_QUEUE2_MUTEX_UNLOCK (queue);
3901   } else {
3902     GST_QUEUE2_MUTEX_LOCK (queue);
3903     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3904     queue->srcresult = GST_FLOW_FLUSHING;
3905     queue->sinkresult = GST_FLOW_FLUSHING;
3906     /* this will unlock getrange */
3907     GST_QUEUE2_SIGNAL_ADD (queue);
3908     result = TRUE;
3909     GST_QUEUE2_MUTEX_UNLOCK (queue);
3910   }
3911
3912   return result;
3913 }
3914
3915 static gboolean
3916 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3917     gboolean active)
3918 {
3919   gboolean res;
3920
3921   switch (mode) {
3922     case GST_PAD_MODE_PULL:
3923       res = gst_queue2_src_activate_pull (pad, parent, active);
3924       break;
3925     case GST_PAD_MODE_PUSH:
3926       res = gst_queue2_src_activate_push (pad, parent, active);
3927       break;
3928     default:
3929       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3930       res = FALSE;
3931       break;
3932   }
3933   return res;
3934 }
3935
3936 static GstStateChangeReturn
3937 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3938 {
3939   GstQueue2 *queue;
3940   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3941
3942   queue = GST_QUEUE2 (element);
3943
3944   switch (transition) {
3945     case GST_STATE_CHANGE_NULL_TO_READY:
3946       break;
3947     case GST_STATE_CHANGE_READY_TO_PAUSED:
3948       GST_QUEUE2_MUTEX_LOCK (queue);
3949       if (!QUEUE_IS_USING_QUEUE (queue)) {
3950         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3951           if (!gst_queue2_open_temp_location_file (queue))
3952             ret = GST_STATE_CHANGE_FAILURE;
3953         } else {
3954           if (queue->ring_buffer) {
3955             g_free (queue->ring_buffer);
3956             queue->ring_buffer = NULL;
3957           }
3958           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3959             ret = GST_STATE_CHANGE_FAILURE;
3960         }
3961         init_ranges (queue);
3962       }
3963       queue->segment_event_received = FALSE;
3964       queue->starting_segment = NULL;
3965       gst_event_replace (&queue->stream_start_event, NULL);
3966       GST_QUEUE2_MUTEX_UNLOCK (queue);
3967       query_downstream_bitrate (queue);
3968       break;
3969     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3970 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3971       GST_QUEUE2_MUTEX_LOCK (queue);
3972       queue->is_buffering = FALSE;
3973       GST_QUEUE2_MUTEX_UNLOCK (queue);
3974 #endif
3975       break;
3976     default:
3977       break;
3978   }
3979
3980   if (ret == GST_STATE_CHANGE_FAILURE)
3981     return ret;
3982
3983   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3984
3985   if (ret == GST_STATE_CHANGE_FAILURE)
3986     return ret;
3987
3988   switch (transition) {
3989     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3990       break;
3991     case GST_STATE_CHANGE_PAUSED_TO_READY:
3992       GST_QUEUE2_MUTEX_LOCK (queue);
3993       if (!QUEUE_IS_USING_QUEUE (queue)) {
3994         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3995           gst_queue2_close_temp_location_file (queue);
3996         } else if (queue->ring_buffer) {
3997           g_free (queue->ring_buffer);
3998           queue->ring_buffer = NULL;
3999         }
4000         clean_ranges (queue);
4001       }
4002       if (queue->starting_segment != NULL) {
4003         gst_event_unref (queue->starting_segment);
4004         queue->starting_segment = NULL;
4005       }
4006       gst_event_replace (&queue->stream_start_event, NULL);
4007       GST_QUEUE2_MUTEX_UNLOCK (queue);
4008       break;
4009     case GST_STATE_CHANGE_READY_TO_NULL:
4010       break;
4011     default:
4012       break;
4013   }
4014
4015   return ret;
4016 }
4017
4018 /* changing the capacity of the queue must wake up
4019  * the _chain function, it might have more room now
4020  * to store the buffer/event in the queue */
4021 #define QUEUE_CAPACITY_CHANGE(q) \
4022   GST_QUEUE2_SIGNAL_DEL (queue); \
4023   if (queue->use_buffering)      \
4024     update_buffering (queue);
4025
4026 /* Changing the minimum required fill level must
4027  * wake up the _loop function as it might now
4028  * be able to preceed.
4029  */
4030 #define QUEUE_THRESHOLD_CHANGE(q)\
4031   GST_QUEUE2_SIGNAL_ADD (queue);
4032
4033 static void
4034 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
4035 {
4036   GstState state;
4037
4038   /* the element must be stopped in order to do this */
4039   GST_OBJECT_LOCK (queue);
4040   state = GST_STATE (queue);
4041   if (state != GST_STATE_READY && state != GST_STATE_NULL)
4042     goto wrong_state;
4043   GST_OBJECT_UNLOCK (queue);
4044
4045   /* set new location */
4046   g_free (queue->temp_template);
4047   queue->temp_template = g_strdup (template);
4048
4049   return;
4050
4051 /* ERROR */
4052 wrong_state:
4053   {
4054     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
4055     GST_OBJECT_UNLOCK (queue);
4056   }
4057 }
4058
4059 static void
4060 gst_queue2_set_property (GObject * object,
4061     guint prop_id, const GValue * value, GParamSpec * pspec)
4062 {
4063   GstQueue2 *queue = GST_QUEUE2 (object);
4064
4065   /* someone could change levels here, and since this
4066    * affects the get/put funcs, we need to lock for safety. */
4067   GST_QUEUE2_MUTEX_LOCK (queue);
4068
4069   switch (prop_id) {
4070     case PROP_MAX_SIZE_BYTES:
4071       queue->max_level.bytes = g_value_get_uint (value);
4072       QUEUE_CAPACITY_CHANGE (queue);
4073       break;
4074     case PROP_MAX_SIZE_BUFFERS:
4075       queue->max_level.buffers = g_value_get_uint (value);
4076       QUEUE_CAPACITY_CHANGE (queue);
4077       break;
4078     case PROP_MAX_SIZE_TIME:
4079       queue->max_level.time = g_value_get_uint64 (value);
4080       /* set rate_time to the same value. We use an extra field in the level
4081        * structure so that we can easily access and compare it */
4082       queue->max_level.rate_time = queue->max_level.time;
4083       QUEUE_CAPACITY_CHANGE (queue);
4084       break;
4085     case PROP_USE_BUFFERING:
4086       queue->use_buffering = g_value_get_boolean (value);
4087       if (!queue->use_buffering && queue->is_buffering) {
4088         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
4089             "posting 100%% message");
4090         SET_PERCENT (queue, 100);
4091         queue->is_buffering = FALSE;
4092       }
4093
4094       if (queue->use_buffering) {
4095         queue->is_buffering = TRUE;
4096         update_buffering (queue);
4097       }
4098       break;
4099     case PROP_USE_TAGS_BITRATE:
4100       queue->use_tags_bitrate = g_value_get_boolean (value);
4101       break;
4102     case PROP_USE_RATE_ESTIMATE:
4103       queue->use_rate_estimate = g_value_get_boolean (value);
4104       break;
4105     case PROP_LOW_PERCENT:
4106       queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
4107       if (queue->is_buffering)
4108         update_buffering (queue);
4109       break;
4110     case PROP_HIGH_PERCENT:
4111       queue->high_watermark =
4112           g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
4113       if (queue->is_buffering)
4114         update_buffering (queue);
4115       break;
4116     case PROP_LOW_WATERMARK:
4117       queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
4118       if (queue->is_buffering)
4119         update_buffering (queue);
4120       break;
4121     case PROP_HIGH_WATERMARK:
4122       queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
4123       if (queue->is_buffering)
4124         update_buffering (queue);
4125       break;
4126     case PROP_TEMP_TEMPLATE:
4127       gst_queue2_set_temp_template (queue, g_value_get_string (value));
4128       break;
4129     case PROP_TEMP_REMOVE:
4130       queue->temp_remove = g_value_get_boolean (value);
4131       break;
4132     case PROP_RING_BUFFER_MAX_SIZE:
4133       queue->ring_buffer_max_size = g_value_get_uint64 (value);
4134       break;
4135     case PROP_USE_BITRATE_QUERY:
4136       queue->use_bitrate_query = g_value_get_boolean (value);
4137       break;
4138 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
4139     case PROP_BUFFER_MODE:
4140       queue->mode = g_value_get_enum (value);
4141       break;
4142 #endif
4143     default:
4144       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4145       break;
4146   }
4147
4148   GST_QUEUE2_MUTEX_UNLOCK (queue);
4149   gst_queue2_post_buffering (queue);
4150 }
4151
4152 static void
4153 gst_queue2_get_property (GObject * object,
4154     guint prop_id, GValue * value, GParamSpec * pspec)
4155 {
4156   GstQueue2 *queue = GST_QUEUE2 (object);
4157
4158   GST_QUEUE2_MUTEX_LOCK (queue);
4159
4160   switch (prop_id) {
4161     case PROP_CUR_LEVEL_BYTES:
4162       g_value_set_uint (value, queue->cur_level.bytes);
4163       break;
4164     case PROP_CUR_LEVEL_BUFFERS:
4165       g_value_set_uint (value, queue->cur_level.buffers);
4166       break;
4167     case PROP_CUR_LEVEL_TIME:
4168       g_value_set_uint64 (value, queue->cur_level.time);
4169       break;
4170     case PROP_MAX_SIZE_BYTES:
4171       g_value_set_uint (value, queue->max_level.bytes);
4172       break;
4173     case PROP_MAX_SIZE_BUFFERS:
4174       g_value_set_uint (value, queue->max_level.buffers);
4175       break;
4176     case PROP_MAX_SIZE_TIME:
4177       g_value_set_uint64 (value, queue->max_level.time);
4178       break;
4179     case PROP_USE_BUFFERING:
4180       g_value_set_boolean (value, queue->use_buffering);
4181       break;
4182     case PROP_USE_TAGS_BITRATE:
4183       g_value_set_boolean (value, queue->use_tags_bitrate);
4184       break;
4185     case PROP_USE_RATE_ESTIMATE:
4186       g_value_set_boolean (value, queue->use_rate_estimate);
4187       break;
4188     case PROP_LOW_PERCENT:
4189       g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
4190       break;
4191     case PROP_HIGH_PERCENT:
4192       g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
4193       break;
4194     case PROP_LOW_WATERMARK:
4195       g_value_set_double (value, queue->low_watermark /
4196           (gdouble) MAX_BUFFERING_LEVEL);
4197       break;
4198     case PROP_HIGH_WATERMARK:
4199       g_value_set_double (value, queue->high_watermark /
4200           (gdouble) MAX_BUFFERING_LEVEL);
4201       break;
4202     case PROP_TEMP_TEMPLATE:
4203       g_value_set_string (value, queue->temp_template);
4204       break;
4205     case PROP_TEMP_LOCATION:
4206       g_value_set_string (value, queue->temp_location);
4207       break;
4208     case PROP_TEMP_REMOVE:
4209       g_value_set_boolean (value, queue->temp_remove);
4210       break;
4211     case PROP_RING_BUFFER_MAX_SIZE:
4212       g_value_set_uint64 (value, queue->ring_buffer_max_size);
4213       break;
4214     case PROP_AVG_IN_RATE:
4215     {
4216       gdouble in_rate = queue->byte_in_rate;
4217
4218       /* During the first RATE_INTERVAL, byte_in_rate will not have been
4219        * calculated, so calculate it here. */
4220       if (in_rate == 0.0 && queue->bytes_in
4221           && queue->last_update_in_rates_elapsed > 0.0)
4222         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
4223
4224       g_value_set_int64 (value, (gint64) in_rate);
4225       break;
4226     }
4227     case PROP_USE_BITRATE_QUERY:
4228       g_value_set_boolean (value, queue->use_bitrate_query);
4229       break;
4230     case PROP_BITRATE:{
4231       guint64 bitrate = 0;
4232       if (bitrate == 0 && queue->use_tags_bitrate) {
4233         if (queue->sink_tags_bitrate > 0)
4234           bitrate = queue->sink_tags_bitrate;
4235         else if (queue->src_tags_bitrate)
4236           bitrate = queue->src_tags_bitrate;
4237       }
4238       if (bitrate == 0 && queue->use_bitrate_query) {
4239         bitrate = queue->downstream_bitrate;
4240       }
4241       g_value_set_uint64 (value, (guint64) bitrate);
4242       break;
4243     }
4244 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
4245     case PROP_BUFFER_MODE:
4246       g_value_set_enum (value, queue->mode);
4247       break;
4248 #endif
4249     default:
4250       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4251       break;
4252   }
4253
4254   GST_QUEUE2_MUTEX_UNLOCK (queue);
4255 }