Merge "[Tizen 5.0] omx memory leak in tv product because of null check" into tizen
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  * @title: queue2
29  *
30  * Data is queued until one of the limits specified by the
31  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
32  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
33  * more buffers into the queue will block the pushing thread until more space
34  * becomes available.
35  *
36  * The queue will create a new thread on the source pad to decouple the
37  * processing on sink and source pad.
38  *
39  * You can query how many buffers are queued by reading the
40  * #GstQueue2:current-level-buffers property.
41  *
42  * The default queue size limits are 100 buffers, 2MB of data, or
43  * two seconds worth of data, whichever is reached first.
44  *
45  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
46  * will allocate a random free filename and buffer data in the file.
47  * By using this, it will buffer the entire stream data on the file independently
48  * of the queue size limits, they will only be used for buffering statistics.
49  *
50  * The temp-location property will be used to notify the application of the
51  * allocated filename.
52  */
53
54 #ifdef HAVE_CONFIG_H
55 #include "config.h"
56 #endif
57
58 #include "gstqueue2.h"
59
60 #include <glib/gstdio.h>
61
62 #include "gst/gst-i18n-lib.h"
63 #include "gst/glib-compat-private.h"
64
65 #include <string.h>
66
67 #ifdef G_OS_WIN32
68 #include <io.h>                 /* lseek, open, close, read */
69 #undef lseek
70 #define lseek _lseeki64
71 #undef off_t
72 #define off_t guint64
73 #else
74 #include <unistd.h>
75 #endif
76
77 #ifdef __BIONIC__               /* Android */
78 #undef lseek
79 #define lseek lseek64
80 #undef off_t
81 #define off_t guint64
82 #include <fcntl.h>
83 #endif
84
85 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
86     GST_PAD_SINK,
87     GST_PAD_ALWAYS,
88     GST_STATIC_CAPS_ANY);
89
90 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
91     GST_PAD_SRC,
92     GST_PAD_ALWAYS,
93     GST_STATIC_CAPS_ANY);
94
95 GST_DEBUG_CATEGORY_STATIC (queue_debug);
96 #define GST_CAT_DEFAULT (queue_debug)
97 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
98
99 enum
100 {
101   LAST_SIGNAL
102 };
103
104 /* other defines */
105 #define DEFAULT_BUFFER_SIZE 4096
106 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
107 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
108 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
109
110 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
111
112 /* default property values */
113 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
114 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
115 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
116 #define DEFAULT_USE_BUFFERING      FALSE
117 #define DEFAULT_USE_TAGS_BITRATE   FALSE
118 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
119 #define DEFAULT_LOW_PERCENT        10
120 #define DEFAULT_HIGH_PERCENT       99
121 #define DEFAULT_LOW_WATERMARK      0.01
122 #define DEFAULT_HIGH_WATERMARK     0.99
123 #define DEFAULT_TEMP_REMOVE        TRUE
124 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
125
126 enum
127 {
128   PROP_0,
129   PROP_CUR_LEVEL_BUFFERS,
130   PROP_CUR_LEVEL_BYTES,
131   PROP_CUR_LEVEL_TIME,
132   PROP_MAX_SIZE_BUFFERS,
133   PROP_MAX_SIZE_BYTES,
134   PROP_MAX_SIZE_TIME,
135   PROP_USE_BUFFERING,
136   PROP_USE_TAGS_BITRATE,
137   PROP_USE_RATE_ESTIMATE,
138   PROP_LOW_PERCENT,
139   PROP_HIGH_PERCENT,
140   PROP_LOW_WATERMARK,
141   PROP_HIGH_WATERMARK,
142   PROP_TEMP_TEMPLATE,
143   PROP_TEMP_LOCATION,
144   PROP_TEMP_REMOVE,
145   PROP_RING_BUFFER_MAX_SIZE,
146   PROP_AVG_IN_RATE,
147 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
148   PROP_BUFFER_MODE,
149 #endif
150   PROP_LAST
151 };
152
153 /* Explanation for buffer levels and percentages:
154  *
155  * The buffering_level functions here return a value in a normalized range
156  * that specifies the queue's current fill level. The range goes from 0 to
157  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
158  *
159  * This is not to be confused with the buffering_percent value, which is
160  * a *relative* quantity - relative to the low/high watermarks.
161  * buffering_percent = 0% means buffering_level is at the low watermark.
162  * buffering_percent = 100% means buffering_level is at the high watermark.
163  * buffering_percent is used for determining if the fill level has reached
164  * the high watermark, and for producing BUFFERING messages. This value
165  * always uses a 0..100 range (since it is a percentage).
166  *
167  * To avoid future confusions, whenever "buffering level" is mentioned, it
168  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
169  * range. Whenever "buffering_percent" is mentioned, it refers to the
170  * percentage value that is relative to the low/high watermark. */
171
172 /* Using a buffering level range of 0..1000000 to allow for a
173  * resolution in ppm (1 ppm = 0.0001%) */
174 #define MAX_BUFFERING_LEVEL 1000000
175
176 /* How much 1% makes up in the buffer level range */
177 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
178
179 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
180   l.buffers = 0;                                        \
181   l.bytes = 0;                                          \
182   l.time = 0;                                           \
183   l.rate_time = 0;                                      \
184 } G_STMT_END
185
186 #define STATUS(queue, pad, msg) \
187   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
188                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
189                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
190                       " ns, %"G_GUINT64_FORMAT" items", \
191                       GST_DEBUG_PAD_NAME (pad), \
192                       queue->cur_level.buffers, \
193                       queue->max_level.buffers, \
194                       queue->cur_level.bytes, \
195                       queue->max_level.bytes, \
196                       queue->cur_level.time, \
197                       queue->max_level.time, \
198                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
199                         queue->current->writing_pos - queue->current->max_reading_pos : \
200                         queue->queue.length))
201
202 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
203   g_mutex_lock (&q->qlock);                                              \
204 } G_STMT_END
205
206 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
207   GST_QUEUE2_MUTEX_LOCK (q);                                            \
208   if (res != GST_FLOW_OK)                                               \
209     goto label;                                                         \
210 } G_STMT_END
211
212 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
213   g_mutex_unlock (&q->qlock);                                            \
214 } G_STMT_END
215
216 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
217   STATUS (queue, q->sinkpad, "wait for DEL");                           \
218   q->waiting_del = TRUE;                                                \
219   g_cond_wait (&q->item_del, &queue->qlock);                              \
220   q->waiting_del = FALSE;                                               \
221   if (res != GST_FLOW_OK) {                                             \
222     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
223     goto label;                                                         \
224   }                                                                     \
225   STATUS (queue, q->sinkpad, "received DEL");                           \
226 } G_STMT_END
227
228 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
229   STATUS (queue, q->srcpad, "wait for ADD");                            \
230   q->waiting_add = TRUE;                                                \
231   g_cond_wait (&q->item_add, &q->qlock);                                  \
232   q->waiting_add = FALSE;                                               \
233   if (res != GST_FLOW_OK) {                                             \
234     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
235     goto label;                                                         \
236   }                                                                     \
237   STATUS (queue, q->srcpad, "received ADD");                            \
238 } G_STMT_END
239
240 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
241   if (q->waiting_del) {                                                 \
242     STATUS (q, q->srcpad, "signal DEL");                                \
243     g_cond_signal (&q->item_del);                                        \
244   }                                                                     \
245 } G_STMT_END
246
247 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
248   if (q->waiting_add) {                                                 \
249     STATUS (q, q->sinkpad, "signal ADD");                               \
250     g_cond_signal (&q->item_add);                                        \
251   }                                                                     \
252 } G_STMT_END
253
254 #define SET_PERCENT(q, perc) G_STMT_START {                              \
255   if (perc != q->buffering_percent) {                                    \
256     q->buffering_percent = perc;                                         \
257     q->percent_changed = TRUE;                                           \
258     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
259     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
260         &q->buffering_left);                                             \
261   }                                                                      \
262 } G_STMT_END
263
264 #define _do_init \
265     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
266     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
267         "dataflow inside the queue element");
268 #define gst_queue2_parent_class parent_class
269 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
270
271 static void gst_queue2_finalize (GObject * object);
272
273 static void gst_queue2_set_property (GObject * object,
274     guint prop_id, const GValue * value, GParamSpec * pspec);
275 static void gst_queue2_get_property (GObject * object,
276     guint prop_id, GValue * value, GParamSpec * pspec);
277
278 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
279     GstBuffer * buffer);
280 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
281     GstBufferList * buffer_list);
282 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
283 static void gst_queue2_loop (GstPad * pad);
284
285 static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad,
286     GstObject * parent, GstEvent * event);
287 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
288     GstQuery * query);
289
290 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
291     GstEvent * event);
292 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
293     GstQuery * query);
294 static gboolean gst_queue2_handle_query (GstElement * element,
295     GstQuery * query);
296
297 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
298     guint64 offset, guint length, GstBuffer ** buffer);
299
300 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
301     GstPadMode mode, gboolean active);
302 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
303     GstPadMode mode, gboolean active);
304 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
305     GstStateChange transition);
306
307 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
308 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
309
310 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
311 static void update_in_rates (GstQueue2 * queue, gboolean force);
312 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
313 static void gst_queue2_post_buffering (GstQueue2 * queue);
314 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
315 static gboolean change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length);
316 #endif
317
318 typedef enum
319 {
320   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
321   GST_QUEUE2_ITEM_TYPE_BUFFER,
322   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
323   GST_QUEUE2_ITEM_TYPE_EVENT,
324   GST_QUEUE2_ITEM_TYPE_QUERY
325 } GstQueue2ItemType;
326
327 typedef struct
328 {
329   GstQueue2ItemType type;
330   GstMiniObject *item;
331 } GstQueue2Item;
332
333 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
334
335 static void
336 gst_queue2_class_init (GstQueue2Class * klass)
337 {
338   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
339   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
340
341   gobject_class->set_property = gst_queue2_set_property;
342   gobject_class->get_property = gst_queue2_get_property;
343
344   /* properties */
345   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
346       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
347           "Current amount of data in the queue (bytes)",
348           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
350       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
351           "Current number of buffers in the queue",
352           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
353   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
354       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
355           "Current amount of data in the queue (in ns)",
356           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
357
358   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
359       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
360           "Max. amount of data in the queue (bytes, 0=disable)",
361           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
362           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
363           G_PARAM_STATIC_STRINGS));
364   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
365       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
366           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
367           DEFAULT_MAX_SIZE_BUFFERS,
368           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
369           G_PARAM_STATIC_STRINGS));
370   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
371       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
372           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
373           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
374           G_PARAM_STATIC_STRINGS));
375
376   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
377       g_param_spec_boolean ("use-buffering", "Use buffering",
378           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
379           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
380           G_PARAM_STATIC_STRINGS));
381   g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
382       g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
383           "Use a bitrate from upstream tags to estimate buffer duration if not provided",
384           DEFAULT_USE_TAGS_BITRATE,
385           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
386           G_PARAM_STATIC_STRINGS));
387   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
388       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
389           "Estimate the bitrate of the stream to calculate time level",
390           DEFAULT_USE_RATE_ESTIMATE,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
393       g_param_spec_int ("low-percent", "Low percent",
394           "Low threshold for buffering to start. Only used if use-buffering is True "
395           "(Deprecated: use low-watermark instead)",
396           0, 100, DEFAULT_LOW_WATERMARK * 100,
397           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
399       g_param_spec_int ("high-percent", "High percent",
400           "High threshold for buffering to finish. Only used if use-buffering is True "
401           "(Deprecated: use high-watermark instead)",
402           0, 100, DEFAULT_HIGH_WATERMARK * 100,
403           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
405       g_param_spec_double ("low-watermark", "Low watermark",
406           "Low threshold for buffering to start. Only used if use-buffering is True",
407           0.0, 1.0, DEFAULT_LOW_WATERMARK,
408           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
410       g_param_spec_double ("high-watermark", "High watermark",
411           "High threshold for buffering to finish. Only used if use-buffering is True",
412           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
413           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
414
415   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
416       g_param_spec_string ("temp-template", "Temporary File Template",
417           "File template to store temporary files in, should contain directory "
418           "and XXXXXX. (NULL == disabled)",
419           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
420
421   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
422       g_param_spec_string ("temp-location", "Temporary File Location",
423           "Location to store temporary files in (Only read this property, "
424           "use temp-template to configure the name template)",
425           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
426
427   /**
428    * GstQueue2:temp-remove
429    *
430    * When temp-template is set, remove the temporary file when going to READY.
431    */
432   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
433       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
434           "Remove the temp-location after use",
435           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
436
437   /**
438    * GstQueue2:ring-buffer-max-size
439    *
440    * The maximum size of the ring buffer in bytes. If set to 0, the ring
441    * buffer is disabled. Default 0.
442    */
443   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
444       g_param_spec_uint64 ("ring-buffer-max-size",
445           "Max. ring buffer size (bytes)",
446           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
447           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
448           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
449
450   /**
451    * GstQueue2:avg-in-rate
452    *
453    * The average input data rate.
454    */
455   g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
456       g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
457           "Average input data rate (bytes/s)",
458           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
459
460 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
461   /**
462    * GstQueue2:buffer-mode:
463    *
464    * Control the buffering mode used by the queue2.
465    */
466   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
467       g_param_spec_enum ("buffer-mode", "Buffer Mode",
468           "Control the buffering algorithm in use",
469           GST_TYPE_BUFFERING_MODE, GST_BUFFERING_STREAM,
470           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
471 #endif
472   /* set several parent class virtual functions */
473   gobject_class->finalize = gst_queue2_finalize;
474
475   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
476   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
477
478   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
479       "Generic",
480       "Simple data queue",
481       "Erik Walthinsen <omega@cse.ogi.edu>, "
482       "Wim Taymans <wim.taymans@gmail.com>");
483
484   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
485   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
486 }
487
488 static void
489 gst_queue2_init (GstQueue2 * queue)
490 {
491   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
492
493   gst_pad_set_chain_function (queue->sinkpad,
494       GST_DEBUG_FUNCPTR (gst_queue2_chain));
495   gst_pad_set_chain_list_function (queue->sinkpad,
496       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
497   gst_pad_set_activatemode_function (queue->sinkpad,
498       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
499   gst_pad_set_event_full_function (queue->sinkpad,
500       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
501   gst_pad_set_query_function (queue->sinkpad,
502       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
503   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
504   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
505
506   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
507
508   gst_pad_set_activatemode_function (queue->srcpad,
509       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
510   gst_pad_set_getrange_function (queue->srcpad,
511       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
512   gst_pad_set_event_function (queue->srcpad,
513       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
514   gst_pad_set_query_function (queue->srcpad,
515       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
516   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
517   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
518
519   /* levels */
520   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
521   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
522   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
523   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
524   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
525   queue->use_buffering = DEFAULT_USE_BUFFERING;
526   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
527   queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
528   queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
529
530   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
531   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
532
533   queue->sinktime = GST_CLOCK_TIME_NONE;
534   queue->srctime = GST_CLOCK_TIME_NONE;
535   queue->sink_tainted = TRUE;
536   queue->src_tainted = TRUE;
537
538   queue->srcresult = GST_FLOW_FLUSHING;
539   queue->sinkresult = GST_FLOW_FLUSHING;
540   queue->is_eos = FALSE;
541   queue->in_timer = g_timer_new ();
542   queue->out_timer = g_timer_new ();
543
544   g_mutex_init (&queue->qlock);
545   queue->waiting_add = FALSE;
546   g_cond_init (&queue->item_add);
547   queue->waiting_del = FALSE;
548   g_cond_init (&queue->item_del);
549   g_queue_init (&queue->queue);
550
551   g_cond_init (&queue->query_handled);
552   queue->last_query = FALSE;
553
554   g_mutex_init (&queue->buffering_post_lock);
555   queue->buffering_percent = 100;
556
557   /* tempfile related */
558   queue->temp_template = NULL;
559   queue->temp_location = NULL;
560   queue->temp_remove = DEFAULT_TEMP_REMOVE;
561
562   queue->ring_buffer = NULL;
563   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
564 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
565   queue->mode = -1;
566 #endif
567   GST_DEBUG_OBJECT (queue,
568       "initialized queue's not_empty & not_full conditions");
569 }
570
571 /* called only once, as opposed to dispose */
572 static void
573 gst_queue2_finalize (GObject * object)
574 {
575   GstQueue2 *queue = GST_QUEUE2 (object);
576
577   GST_DEBUG_OBJECT (queue, "finalizing queue");
578
579   while (!g_queue_is_empty (&queue->queue)) {
580     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
581
582     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
583       gst_mini_object_unref (qitem->item);
584     g_slice_free (GstQueue2Item, qitem);
585   }
586
587   queue->last_query = FALSE;
588   g_queue_clear (&queue->queue);
589   g_mutex_clear (&queue->qlock);
590   g_mutex_clear (&queue->buffering_post_lock);
591   g_cond_clear (&queue->item_add);
592   g_cond_clear (&queue->item_del);
593   g_cond_clear (&queue->query_handled);
594   g_timer_destroy (queue->in_timer);
595   g_timer_destroy (queue->out_timer);
596
597   /* temp_file path cleanup  */
598   g_free (queue->temp_template);
599   g_free (queue->temp_location);
600
601   G_OBJECT_CLASS (parent_class)->finalize (object);
602 }
603
604 static void
605 debug_ranges (GstQueue2 * queue)
606 {
607   GstQueue2Range *walk;
608
609   for (walk = queue->ranges; walk; walk = walk->next) {
610     GST_DEBUG_OBJECT (queue,
611         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
612         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
613         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
614         walk->rb_writing_pos, walk->reading_pos,
615         walk == queue->current ? "**y**" : "  n  ");
616   }
617 }
618
619 /* clear all the downloaded ranges */
620 static void
621 clean_ranges (GstQueue2 * queue)
622 {
623   GST_DEBUG_OBJECT (queue, "clean queue ranges");
624
625   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
626   queue->ranges = NULL;
627   queue->current = NULL;
628 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
629   queue->read = NULL;
630 #endif
631 }
632
633 /* find a range that contains @offset or NULL when nothing does */
634 static GstQueue2Range *
635 find_range (GstQueue2 * queue, guint64 offset)
636 {
637   GstQueue2Range *range = NULL;
638   GstQueue2Range *walk;
639
640   /* first do a quick check for the current range */
641   for (walk = queue->ranges; walk; walk = walk->next) {
642     if (offset >= walk->offset && offset <= walk->writing_pos) {
643       /* we can reuse an existing range */
644       range = walk;
645       break;
646     }
647   }
648   if (range) {
649     GST_DEBUG_OBJECT (queue,
650         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
651         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
652   } else {
653     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
654   }
655   return range;
656 }
657
658 static void
659 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
660 {
661   guint64 max_reading_pos, writing_pos;
662
663   writing_pos = range->writing_pos;
664   max_reading_pos = range->max_reading_pos;
665
666   if (writing_pos > max_reading_pos)
667     queue->cur_level.bytes = writing_pos - max_reading_pos;
668   else
669     queue->cur_level.bytes = 0;
670 }
671
672 /* make a new range for @offset or reuse an existing range */
673 static GstQueue2Range *
674 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
675 {
676   GstQueue2Range *range, *prev, *next;
677
678   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
679
680   if ((range = find_range (queue, offset))) {
681     GST_DEBUG_OBJECT (queue,
682         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
683         range->writing_pos);
684     if (update_existing && range->writing_pos != offset) {
685       GST_DEBUG_OBJECT (queue, "updating range writing position to "
686           "%" G_GUINT64_FORMAT, offset);
687       range->writing_pos = offset;
688     }
689   } else {
690     GST_DEBUG_OBJECT (queue,
691         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
692
693     range = g_slice_new0 (GstQueue2Range);
694     range->offset = offset;
695     /* we want to write to the next location in the ring buffer */
696     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
697     range->writing_pos = offset;
698     range->rb_writing_pos = range->rb_offset;
699     range->reading_pos = offset;
700     range->max_reading_pos = offset;
701
702     /* insert sorted */
703     prev = NULL;
704     next = queue->ranges;
705     while (next) {
706       if (next->offset > offset) {
707         /* insert before next */
708         GST_DEBUG_OBJECT (queue,
709             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
710             next->offset);
711         break;
712       }
713       /* try next */
714       prev = next;
715       next = next->next;
716     }
717     range->next = next;
718     if (prev)
719       prev->next = range;
720     else
721       queue->ranges = range;
722   }
723   debug_ranges (queue);
724
725   /* update the stats for this range */
726   update_cur_level (queue, range);
727
728   return range;
729 }
730
731
732 /* clear and init the download ranges for offset 0 */
733 static void
734 init_ranges (GstQueue2 * queue)
735 {
736   GST_DEBUG_OBJECT (queue, "init queue ranges");
737
738   /* get rid of all the current ranges */
739   clean_ranges (queue);
740   /* make a range for offset 0 */
741 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
742   queue->current = queue->read = add_range (queue, 0, TRUE);
743 #else
744   queue->current = add_range (queue, 0, TRUE);
745 #endif
746 }
747
748 /* calculate the diff between running time on the sink and src of the queue.
749  * This is the total amount of time in the queue. */
750 static void
751 update_time_level (GstQueue2 * queue)
752 {
753   if (queue->sink_tainted) {
754     queue->sinktime =
755         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
756         queue->sink_segment.position);
757     queue->sink_tainted = FALSE;
758   }
759
760   if (queue->src_tainted) {
761     queue->srctime =
762         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
763         queue->src_segment.position);
764     queue->src_tainted = FALSE;
765   }
766
767   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
768       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
769
770   if (queue->sinktime != GST_CLOCK_TIME_NONE
771       && queue->srctime != GST_CLOCK_TIME_NONE
772       && queue->sinktime >= queue->srctime)
773     queue->cur_level.time = queue->sinktime - queue->srctime;
774   else
775     queue->cur_level.time = 0;
776 }
777
778 /* take a SEGMENT event and apply the values to segment, updating the time
779  * level of queue. */
780 static void
781 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
782     gboolean is_sink)
783 {
784   gst_event_copy_segment (event, segment);
785
786   if (segment->format == GST_FORMAT_BYTES) {
787     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
788       /* start is where we'll be getting from and as such writing next */
789 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
790       queue->current = queue->read = add_range (queue, segment->start, TRUE);
791 #else
792       queue->current = add_range (queue, segment->start, TRUE);
793 #endif
794     }
795   }
796
797   /* now configure the values, we use these to track timestamps on the
798    * sinkpad. */
799   if (segment->format != GST_FORMAT_TIME) {
800     /* non-time format, pretend the current time segment is closed with a
801      * 0 start and unknown stop time. */
802     segment->format = GST_FORMAT_TIME;
803     segment->start = 0;
804     segment->stop = -1;
805     segment->time = 0;
806   }
807
808   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
809
810   if (is_sink)
811     queue->sink_tainted = TRUE;
812   else
813     queue->src_tainted = TRUE;
814
815   /* segment can update the time level of the queue */
816   update_time_level (queue);
817 }
818
819 static void
820 apply_gap (GstQueue2 * queue, GstEvent * event,
821     GstSegment * segment, gboolean is_sink)
822 {
823   GstClockTime timestamp;
824   GstClockTime duration;
825
826   gst_event_parse_gap (event, &timestamp, &duration);
827
828   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
829
830     if (GST_CLOCK_TIME_IS_VALID (duration)) {
831       timestamp += duration;
832     }
833
834     segment->position = timestamp;
835
836     if (is_sink)
837       queue->sink_tainted = TRUE;
838     else
839       queue->src_tainted = TRUE;
840
841     /* calc diff with other end */
842     update_time_level (queue);
843   }
844 }
845
846 /* take a buffer and update segment, updating the time level of the queue. */
847 static void
848 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
849     guint64 size, gboolean is_sink)
850 {
851   GstClockTime duration, timestamp;
852
853   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
854   duration = GST_BUFFER_DURATION (buffer);
855
856   /* If we have no duration, pick one from the bitrate if we can */
857   if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
858     guint bitrate =
859         is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
860     if (bitrate)
861       duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
862   }
863
864   /* if no timestamp is set, assume it's continuous with the previous
865    * time */
866   if (timestamp == GST_CLOCK_TIME_NONE)
867     timestamp = segment->position;
868
869   /* add duration */
870   if (duration != GST_CLOCK_TIME_NONE)
871     timestamp += duration;
872
873   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
874       GST_TIME_ARGS (timestamp));
875
876   segment->position = timestamp;
877
878   if (is_sink)
879     queue->sink_tainted = TRUE;
880   else
881     queue->src_tainted = TRUE;
882
883   /* calc diff with other end */
884   update_time_level (queue);
885 }
886
887 struct BufListData
888 {
889   GstClockTime timestamp;
890   guint bitrate;
891 };
892
893 static gboolean
894 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
895 {
896   struct BufListData *bld = data;
897   GstClockTime *timestamp = &bld->timestamp;
898   GstClockTime btime;
899
900   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
901       " duration %" GST_TIME_FORMAT, idx,
902       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
903       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
904       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
905
906   btime = GST_BUFFER_DTS_OR_PTS (*buf);
907   if (GST_CLOCK_TIME_IS_VALID (btime))
908     *timestamp = btime;
909
910   if (GST_BUFFER_DURATION_IS_VALID (*buf))
911     *timestamp += GST_BUFFER_DURATION (*buf);
912   else if (bld->bitrate != 0) {
913     guint64 size = gst_buffer_get_size (*buf);
914
915     /* If we have no duration, pick one from the bitrate if we can */
916     *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
917   }
918
919
920   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
921   return TRUE;
922 }
923
924 /* take a buffer list and update segment, updating the time level of the queue */
925 static void
926 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
927     GstSegment * segment, gboolean is_sink)
928 {
929   struct BufListData bld;
930
931   /* if no timestamp is set, assume it's continuous with the previous time */
932   bld.timestamp = segment->position;
933
934   if (queue->use_tags_bitrate) {
935     if (is_sink)
936       bld.bitrate = queue->sink_tags_bitrate;
937     else
938       bld.bitrate = queue->src_tags_bitrate;
939   } else
940     bld.bitrate = 0;
941
942   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
943
944   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
945       GST_TIME_ARGS (bld.timestamp));
946
947   segment->position = bld.timestamp;
948
949   if (is_sink)
950     queue->sink_tainted = TRUE;
951   else
952     queue->src_tainted = TRUE;
953
954   /* calc diff with other end */
955   update_time_level (queue);
956 }
957
958 static inline gint
959 normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
960     guint64 alt_max)
961 {
962   guint64 p;
963
964   if (max_level == 0)
965     return 0;
966
967   if (alt_max > 0)
968     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
969         MIN (max_level, alt_max));
970   else
971     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
972
973   return MIN (p, MAX_BUFFERING_LEVEL);
974 }
975
976 static gboolean
977 get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
978     gint * buffering_level)
979 {
980   gint buflevel, buflevel2;
981
982   if (queue->high_watermark <= 0) {
983     if (buffering_level)
984       *buffering_level = MAX_BUFFERING_LEVEL;
985     if (is_buffering)
986       *is_buffering = FALSE;
987     return FALSE;
988   }
989 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
990     normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
991
992   if (queue->is_eos) {
993     /* on EOS we are always 100% full, we set the var here so that it we can
994      * reuse the logic below to stop buffering */
995     buflevel = MAX_BUFFERING_LEVEL;
996     GST_LOG_OBJECT (queue, "we are EOS");
997   } else {
998     GST_LOG_OBJECT (queue,
999         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
1000         queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
1001         queue->cur_level.buffers);
1002
1003     /* figure out the buffering level we are filled, we take the max of all formats. */
1004     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1005       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
1006     } else {
1007       guint64 rb_size = queue->ring_buffer_max_size;
1008       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
1009     }
1010
1011     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
1012     buflevel = MAX (buflevel, buflevel2);
1013
1014     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
1015     buflevel = MAX (buflevel, buflevel2);
1016
1017     /* also apply the rate estimate when we need to */
1018     if (queue->use_rate_estimate) {
1019       buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
1020       buflevel = MAX (buflevel, buflevel2);
1021     }
1022
1023     /* Don't get to 0% unless we're really empty */
1024     if (queue->cur_level.bytes > 0)
1025       buflevel = MAX (1, buflevel);
1026   }
1027 #undef GET_BUFFER_LEVEL_FOR_QUANTITY
1028
1029   if (is_buffering)
1030     *is_buffering = queue->is_buffering;
1031
1032   if (buffering_level)
1033     *buffering_level = buflevel;
1034
1035   GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
1036       buflevel);
1037
1038   return TRUE;
1039 }
1040
1041 static gint
1042 convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
1043 {
1044   int percent;
1045
1046   /* scale so that if buffering_level equals the high watermark,
1047    * the percentage is 100% */
1048   percent = buffering_level * 100 / queue->high_watermark;
1049   /* clip */
1050   if (percent > 100)
1051     percent = 100;
1052
1053   return percent;
1054 }
1055
1056 static void
1057 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
1058     gint * avg_in, gint * avg_out, gint64 * buffering_left)
1059 {
1060 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
1061   if (queue->mode != -1) {
1062     *mode = queue->mode;
1063   } else {
1064     if (mode) {
1065       if (!QUEUE_IS_USING_QUEUE (queue)) {
1066         if (QUEUE_IS_USING_RING_BUFFER (queue))
1067           *mode = GST_BUFFERING_TIMESHIFT;
1068         else
1069           *mode = GST_BUFFERING_DOWNLOAD;
1070       } else {
1071         *mode = GST_BUFFERING_STREAM;
1072       }
1073     }
1074   }
1075 #else
1076   if (mode) {
1077     if (!QUEUE_IS_USING_QUEUE (queue)) {
1078       if (QUEUE_IS_USING_RING_BUFFER (queue))
1079         *mode = GST_BUFFERING_TIMESHIFT;
1080       else
1081         *mode = GST_BUFFERING_DOWNLOAD;
1082     } else {
1083       *mode = GST_BUFFERING_STREAM;
1084     }
1085   }
1086 #endif
1087
1088   if (avg_in)
1089     *avg_in = queue->byte_in_rate;
1090   if (avg_out)
1091     *avg_out = queue->byte_out_rate;
1092
1093   if (buffering_left) {
1094     *buffering_left = (percent == 100 ? 0 : -1);
1095
1096     if (queue->use_rate_estimate) {
1097       guint64 max, cur;
1098
1099       max = queue->max_level.rate_time;
1100       cur = queue->cur_level.rate_time;
1101
1102       if (percent != 100 && max > cur)
1103         *buffering_left = (max - cur) / 1000000;
1104     }
1105   }
1106 }
1107
1108 /* Called with the lock taken */
1109 static GstMessage *
1110 gst_queue2_get_buffering_message (GstQueue2 * queue)
1111 {
1112   GstMessage *msg = NULL;
1113
1114   if (queue->percent_changed) {
1115     gint percent = queue->buffering_percent;
1116
1117     queue->percent_changed = FALSE;
1118
1119     GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1120     msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1121
1122     gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1123         queue->avg_out, queue->buffering_left);
1124   }
1125
1126   return msg;
1127 }
1128
1129 static void
1130 gst_queue2_post_buffering (GstQueue2 * queue)
1131 {
1132   GstMessage *msg = NULL;
1133
1134   g_mutex_lock (&queue->buffering_post_lock);
1135   GST_QUEUE2_MUTEX_LOCK (queue);
1136   msg = gst_queue2_get_buffering_message (queue);
1137   GST_QUEUE2_MUTEX_UNLOCK (queue);
1138
1139   if (msg != NULL)
1140     gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1141
1142   g_mutex_unlock (&queue->buffering_post_lock);
1143 }
1144
1145 static void
1146 update_buffering (GstQueue2 * queue)
1147 {
1148   gint buffering_level, percent;
1149 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1150   GstQueue2Range *range;
1151
1152   if (queue->read)
1153     range = queue->read;
1154   else
1155     range = queue->current;
1156 #endif
1157
1158   /* Ensure the variables used to calculate buffering state are up-to-date. */
1159 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1160   if (range)
1161     update_cur_level (queue, range);
1162 #else
1163   if (queue->current)
1164     update_cur_level (queue, queue->current);
1165 #endif
1166   update_in_rates (queue, FALSE);
1167
1168   if (!get_buffering_level (queue, NULL, &buffering_level))
1169     return;
1170
1171   percent = convert_to_buffering_percent (queue, buffering_level);
1172
1173   if (queue->is_buffering) {
1174     /* if we were buffering see if we reached the high watermark */
1175     if (percent >= 100)
1176       queue->is_buffering = FALSE;
1177
1178     SET_PERCENT (queue, percent);
1179   } else {
1180     /* we were not buffering, check if we need to start buffering if we drop
1181      * below the low threshold */
1182     if (buffering_level < queue->low_watermark) {
1183       queue->is_buffering = TRUE;
1184       SET_PERCENT (queue, percent);
1185     }
1186   }
1187 }
1188
1189 static void
1190 reset_rate_timer (GstQueue2 * queue)
1191 {
1192   queue->bytes_in = 0;
1193   queue->bytes_out = 0;
1194   queue->byte_in_rate = 0.0;
1195   queue->byte_in_period = 0;
1196   queue->byte_out_rate = 0.0;
1197   queue->last_update_in_rates_elapsed = 0.0;
1198   queue->last_in_elapsed = 0.0;
1199   queue->last_out_elapsed = 0.0;
1200   queue->in_timer_started = FALSE;
1201   queue->out_timer_started = FALSE;
1202 }
1203
1204 /* the interval in seconds to recalculate the rate */
1205 #define RATE_INTERVAL    0.2
1206 /* Tuning for rate estimation. We use a large window for the input rate because
1207  * it should be stable when connected to a network. The output rate is less
1208  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1209  * therefore adapt more quickly.
1210  * However, initial input rate may be subject to a burst, and should therefore
1211  * initially also adapt more quickly to changes, and only later on give higher
1212  * weight to previous values. */
1213 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1214 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1215
1216 static void
1217 update_in_rates (GstQueue2 * queue, gboolean force)
1218 {
1219   gdouble elapsed, period;
1220   gdouble byte_in_rate;
1221
1222   if (!queue->in_timer_started) {
1223     queue->in_timer_started = TRUE;
1224     g_timer_start (queue->in_timer);
1225     return;
1226   }
1227
1228   queue->last_update_in_rates_elapsed = elapsed =
1229       g_timer_elapsed (queue->in_timer, NULL);
1230
1231   /* recalc after each interval. */
1232   if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1233     period = elapsed - queue->last_in_elapsed;
1234
1235     GST_DEBUG_OBJECT (queue,
1236         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1237         period, queue->bytes_in, queue->byte_in_period);
1238
1239     byte_in_rate = queue->bytes_in / period;
1240
1241     if (queue->byte_in_rate == 0.0)
1242       queue->byte_in_rate = byte_in_rate;
1243     else
1244       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1245           (double) queue->byte_in_period, period);
1246
1247     /* another data point, cap at 16 for long time running average */
1248     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1249       queue->byte_in_period += period;
1250
1251     /* reset the values to calculate rate over the next interval */
1252     queue->last_in_elapsed = elapsed;
1253     queue->bytes_in = 0;
1254   }
1255
1256   if (queue->byte_in_rate > 0.0) {
1257     queue->cur_level.rate_time =
1258         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1259   }
1260   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1261       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1262 }
1263
1264 static void
1265 update_out_rates (GstQueue2 * queue)
1266 {
1267   gdouble elapsed, period;
1268   gdouble byte_out_rate;
1269
1270   if (!queue->out_timer_started) {
1271     queue->out_timer_started = TRUE;
1272     g_timer_start (queue->out_timer);
1273     return;
1274   }
1275
1276   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1277
1278   /* recalc after each interval. */
1279   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1280     period = elapsed - queue->last_out_elapsed;
1281
1282     GST_DEBUG_OBJECT (queue,
1283         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1284
1285     byte_out_rate = queue->bytes_out / period;
1286
1287     if (queue->byte_out_rate == 0.0)
1288       queue->byte_out_rate = byte_out_rate;
1289     else
1290       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1291
1292     /* reset the values to calculate rate over the next interval */
1293     queue->last_out_elapsed = elapsed;
1294     queue->bytes_out = 0;
1295   }
1296   if (queue->byte_in_rate > 0.0) {
1297     queue->cur_level.rate_time =
1298         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1299   }
1300   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1301       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1302 }
1303
1304 static void
1305 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1306 {
1307   guint64 reading_pos, max_reading_pos;
1308
1309   reading_pos = pos;
1310   max_reading_pos = range->max_reading_pos;
1311
1312   max_reading_pos = MAX (max_reading_pos, reading_pos);
1313
1314   GST_DEBUG_OBJECT (queue,
1315       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1316       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1317   range->max_reading_pos = max_reading_pos;
1318
1319   update_cur_level (queue, range);
1320 }
1321
1322 static gboolean
1323 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1324 {
1325   GstEvent *event;
1326   gboolean res;
1327
1328   /* until we receive the FLUSH_STOP from this seek, we skip data */
1329   queue->seeking = TRUE;
1330   GST_QUEUE2_MUTEX_UNLOCK (queue);
1331
1332   debug_ranges (queue);
1333
1334   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1335
1336   event =
1337       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1338       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1339       GST_SEEK_TYPE_NONE, -1);
1340
1341   res = gst_pad_push_event (queue->sinkpad, event);
1342   GST_QUEUE2_MUTEX_LOCK (queue);
1343
1344   if (res) {
1345     /* Between us sending the seek event and re-acquiring the lock, the source
1346      * thread might already have pushed data and moved along the range's
1347      * writing_pos beyond the seek offset. In that case we don't want to set
1348      * the writing position back to the requested seek position, as it would
1349      * cause data to be written to the wrong offset in the file or ring buffer.
1350      * We still do the add_range call to switch the current range to the
1351      * requested range, or create one if one doesn't exist yet. */
1352 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1353     queue->current = queue->read = add_range (queue, offset, FALSE);
1354 #else
1355     queue->current = add_range (queue, offset, FALSE);
1356 #endif
1357   }
1358
1359   return res;
1360 }
1361
1362 /* get the threshold for when we decide to seek rather than wait */
1363 static guint64
1364 get_seek_threshold (GstQueue2 * queue)
1365 {
1366   guint64 threshold;
1367
1368   /* FIXME, find a good threshold based on the incoming rate. */
1369   threshold = 1024 * 512;
1370
1371   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1372     threshold = MIN (threshold,
1373         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1374   }
1375   return threshold;
1376 }
1377
1378 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1379 /* check the buffered data size, not to change range repeatedly.
1380  * changing range cause the new http connection and it makes buffering state frequently. */
1381 static gboolean
1382 change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length)
1383 {
1384   guint64 curr_level = 0;
1385   guint64 curr_min_level = 0;
1386
1387   if (queue->current->writing_pos > queue->current->reading_pos)
1388     curr_level = queue->current->writing_pos - queue->current->reading_pos;
1389
1390   /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
1391   curr_min_level = MIN((queue->max_level.bytes*0.05), get_seek_threshold(queue));
1392
1393   GST_DEBUG_OBJECT(queue, " curr[w%"G_GUINT64_FORMAT", r%"G_GUINT64_FORMAT
1394         ", d%"G_GUINT64_FORMAT", m%"G_GUINT64_FORMAT
1395         "] u[%"G_GUINT64_FORMAT"][EOS:%d]",
1396         queue->current->writing_pos, queue->current->reading_pos,
1397         curr_level, curr_min_level,
1398         queue->upstream_size, queue->is_eos);
1399
1400   /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
1401   if ((queue->is_eos) ||
1402       (queue->upstream_size > 0 && queue->current->writing_pos >= queue->upstream_size) ||
1403       (curr_level >= curr_min_level)) {
1404     GST_DEBUG_OBJECT (queue, "Range Switching");
1405     return TRUE;
1406   } else {
1407     GST_DEBUG_OBJECT (queue, "keep receiving data without range switching.");
1408     return FALSE;
1409   }
1410 }
1411 #endif
1412
1413 /* see if there is enough data in the file to read a full buffer */
1414 static gboolean
1415 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1416 {
1417   GstQueue2Range *range;
1418 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1419   gboolean need_to_seek = TRUE;
1420 #endif
1421   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1422       offset, length);
1423
1424   if ((range = find_range (queue, offset))) {
1425 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1426     queue->read = range;
1427 #endif
1428     if (queue->current != range) {
1429       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1430 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1431       if (QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER(queue))
1432         need_to_seek = change_current_range(queue, range, offset, length);
1433
1434       if (need_to_seek == TRUE)
1435         perform_seek_to_offset (queue, range->writing_pos);
1436 #else
1437       perform_seek_to_offset (queue, range->writing_pos);
1438 #endif
1439     }
1440
1441     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1442         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1443
1444     /* we have a range for offset */
1445     GST_DEBUG_OBJECT (queue,
1446         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1447         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1448
1449     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1450       return TRUE;
1451
1452     if (offset + length <= range->writing_pos)
1453       return TRUE;
1454     else
1455       GST_DEBUG_OBJECT (queue,
1456           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1457           (offset + length) - range->writing_pos);
1458
1459   } else {
1460     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1461         " len %u", offset, length);
1462
1463 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1464     queue->read = queue->current;
1465 #endif
1466
1467     /* we don't have the range, see how far away we are */
1468     if (!queue->is_eos && queue->current) {
1469       guint64 threshold = get_seek_threshold (queue);
1470
1471       if (offset >= queue->current->offset && offset <=
1472           queue->current->writing_pos + threshold) {
1473         GST_INFO_OBJECT (queue,
1474             "requested data is within range, wait for data");
1475         return FALSE;
1476       }
1477     }
1478
1479     /* too far away, do a seek */
1480     perform_seek_to_offset (queue, offset);
1481   }
1482
1483   return FALSE;
1484 }
1485
1486 #ifdef HAVE_FSEEKO
1487 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1488 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1489 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1490 #else
1491 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1492 #endif
1493
1494 static GstFlowReturn
1495 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1496     guint8 * dst, gint64 * read_return)
1497 {
1498   guint8 *ring_buffer;
1499   size_t res;
1500
1501   ring_buffer = queue->ring_buffer;
1502
1503   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1504     goto seek_failed;
1505
1506   /* this should not block */
1507   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1508       length, offset);
1509   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1510     res = fread (dst, 1, length, queue->temp_file);
1511   } else {
1512     memcpy (dst, ring_buffer + offset, length);
1513     res = length;
1514   }
1515
1516   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1517
1518   if (G_UNLIKELY (res < length)) {
1519     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1520       goto could_not_read;
1521     /* check for errors or EOF */
1522     if (ferror (queue->temp_file))
1523       goto could_not_read;
1524     if (feof (queue->temp_file) && length > 0)
1525       goto eos;
1526   }
1527
1528   *read_return = res;
1529
1530   return GST_FLOW_OK;
1531
1532 seek_failed:
1533   {
1534     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1535     return GST_FLOW_ERROR;
1536   }
1537 could_not_read:
1538   {
1539     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1540     return GST_FLOW_ERROR;
1541   }
1542 eos:
1543   {
1544     GST_DEBUG ("non-regular file hits EOS");
1545     return GST_FLOW_EOS;
1546   }
1547 }
1548
1549 static GstFlowReturn
1550 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1551     GstBuffer ** buffer)
1552 {
1553   GstBuffer *buf;
1554   GstMapInfo info;
1555   guint8 *data;
1556   guint64 file_offset;
1557   guint block_length, remaining, read_length;
1558   guint64 rb_size;
1559   guint64 max_size;
1560   guint64 rpos;
1561   GstFlowReturn ret = GST_FLOW_OK;
1562
1563   /* allocate the output buffer of the requested size */
1564   if (*buffer == NULL)
1565     buf = gst_buffer_new_allocate (NULL, length, NULL);
1566   else
1567     buf = *buffer;
1568
1569   if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1570     goto buffer_write_fail;
1571   data = info.data;
1572
1573   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1574       offset);
1575
1576   rpos = offset;
1577   rb_size = queue->ring_buffer_max_size;
1578   max_size = QUEUE_MAX_BYTES (queue);
1579
1580   remaining = length;
1581   while (remaining > 0) {
1582     /* configure how much/whether to read */
1583     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1584       read_length = 0;
1585
1586       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1587         guint64 level;
1588
1589         /* calculate how far away the offset is */
1590 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1591         if (queue->read->writing_pos > rpos)
1592           level = queue->read->writing_pos - rpos;
1593 #else
1594         if (queue->current->writing_pos > rpos)
1595           level = queue->current->writing_pos - rpos;
1596 #endif
1597         else
1598           level = 0;
1599
1600         GST_DEBUG_OBJECT (queue,
1601             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1602             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1603 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1604             rpos, queue->read->writing_pos, level, max_size);
1605 #else
1606             rpos, queue->current->writing_pos, level, max_size);
1607 #endif
1608
1609         if (level >= max_size) {
1610           /* we don't have the data but if we have a ring buffer that is full, we
1611            * need to read */
1612           GST_DEBUG_OBJECT (queue,
1613               "ring buffer full, reading QUEUE_MAX_BYTES %"
1614               G_GUINT64_FORMAT " bytes", max_size);
1615           read_length = max_size;
1616         } else if (queue->is_eos) {
1617           /* won't get any more data so read any data we have */
1618           if (level) {
1619             GST_DEBUG_OBJECT (queue,
1620                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1621                 level);
1622             read_length = level;
1623             remaining = level;
1624             length = level;
1625           } else
1626             goto hit_eos;
1627         }
1628       }
1629
1630       if (read_length == 0) {
1631         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1632 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1633           GST_DEBUG_OBJECT (queue,
1634               "update current position [%" G_GUINT64_FORMAT "-%"
1635               G_GUINT64_FORMAT "]", rpos, queue->read->max_reading_pos);
1636           update_cur_pos (queue, queue->read, rpos);
1637 #else
1638           GST_DEBUG_OBJECT (queue,
1639               "update current position [%" G_GUINT64_FORMAT "-%"
1640               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1641           update_cur_pos (queue, queue->current, rpos);
1642 #endif
1643           GST_QUEUE2_SIGNAL_DEL (queue);
1644         }
1645
1646         if (queue->use_buffering)
1647           update_buffering (queue);
1648
1649         GST_DEBUG_OBJECT (queue, "waiting for add");
1650         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1651         continue;
1652       }
1653     } else {
1654       /* we have the requested data so read it */
1655       read_length = remaining;
1656     }
1657
1658     /* set range reading_pos to actual reading position for this read */
1659 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1660     queue->read->reading_pos = rpos;
1661 #else
1662     queue->current->reading_pos = rpos;
1663 #endif
1664
1665     /* configure how much and from where to read */
1666     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1667       file_offset =
1668 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1669           (queue->read->rb_offset + (rpos -
1670               queue->read->offset)) % rb_size;
1671 #else
1672           (queue->current->rb_offset + (rpos -
1673               queue->current->offset)) % rb_size;
1674 #endif
1675       if (file_offset + read_length > rb_size) {
1676         block_length = rb_size - file_offset;
1677       } else {
1678         block_length = read_length;
1679       }
1680     } else {
1681       file_offset = rpos;
1682       block_length = read_length;
1683     }
1684
1685     /* while we still have data to read, we loop */
1686     while (read_length > 0) {
1687       gint64 read_return;
1688
1689       ret =
1690           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1691           data, &read_return);
1692       if (ret != GST_FLOW_OK)
1693         goto read_error;
1694
1695       file_offset += read_return;
1696       if (QUEUE_IS_USING_RING_BUFFER (queue))
1697         file_offset %= rb_size;
1698
1699       data += read_return;
1700       read_length -= read_return;
1701       block_length = read_length;
1702       remaining -= read_return;
1703
1704 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1705       rpos = (queue->read->reading_pos += read_return);
1706       update_cur_pos (queue, queue->read, queue->read->reading_pos);
1707 #else
1708       rpos = (queue->current->reading_pos += read_return);
1709       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1710 #endif
1711     }
1712     GST_QUEUE2_SIGNAL_DEL (queue);
1713     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1714   }
1715
1716   gst_buffer_unmap (buf, &info);
1717   gst_buffer_resize (buf, 0, length);
1718
1719   GST_BUFFER_OFFSET (buf) = offset;
1720   GST_BUFFER_OFFSET_END (buf) = offset + length;
1721
1722   *buffer = buf;
1723
1724   return ret;
1725
1726   /* ERRORS */
1727 hit_eos:
1728   {
1729     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1730     gst_buffer_unmap (buf, &info);
1731     if (*buffer == NULL)
1732       gst_buffer_unref (buf);
1733     return GST_FLOW_EOS;
1734   }
1735 out_flushing:
1736   {
1737     GST_DEBUG_OBJECT (queue, "we are flushing");
1738     gst_buffer_unmap (buf, &info);
1739     if (*buffer == NULL)
1740       gst_buffer_unref (buf);
1741     return GST_FLOW_FLUSHING;
1742   }
1743 read_error:
1744   {
1745     GST_DEBUG_OBJECT (queue, "we have a read error");
1746     gst_buffer_unmap (buf, &info);
1747     if (*buffer == NULL)
1748       gst_buffer_unref (buf);
1749     return ret;
1750   }
1751 buffer_write_fail:
1752   {
1753     GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1754         ("Can't write to buffer"));
1755     if (*buffer == NULL)
1756       gst_buffer_unref (buf);
1757     return GST_FLOW_ERROR;
1758   }
1759 }
1760
1761 /* should be called with QUEUE_LOCK */
1762 static GstMiniObject *
1763 gst_queue2_read_item_from_file (GstQueue2 * queue)
1764 {
1765   GstMiniObject *item;
1766
1767   if (queue->stream_start_event != NULL) {
1768     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1769     queue->stream_start_event = NULL;
1770   } else if (queue->starting_segment != NULL) {
1771     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1772     queue->starting_segment = NULL;
1773   } else {
1774     GstFlowReturn ret;
1775     GstBuffer *buffer = NULL;
1776     guint64 reading_pos;
1777
1778 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
1779     reading_pos = queue->read->reading_pos;
1780 #else
1781     reading_pos = queue->current->reading_pos;
1782 #endif
1783
1784     ret =
1785         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1786         &buffer);
1787
1788     switch (ret) {
1789       case GST_FLOW_OK:
1790         item = GST_MINI_OBJECT_CAST (buffer);
1791         break;
1792       case GST_FLOW_EOS:
1793         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1794         break;
1795       default:
1796         item = NULL;
1797         break;
1798     }
1799   }
1800   return item;
1801 }
1802
1803 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1804  * the temp filename. */
1805 static gboolean
1806 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1807 {
1808   gint fd = -1;
1809   gchar *name = NULL;
1810
1811   if (queue->temp_file)
1812     goto already_opened;
1813
1814   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1815
1816   /* If temp_template was set, allocate a filename and open that file */
1817
1818   /* nothing to do */
1819   if (queue->temp_template == NULL)
1820     goto no_directory;
1821
1822   /* make copy of the template, we don't want to change this */
1823   name = g_strdup (queue->temp_template);
1824
1825 #ifdef __BIONIC__
1826   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1827 #else
1828   fd = g_mkstemp (name);
1829 #endif
1830
1831   if (fd == -1)
1832     goto mkstemp_failed;
1833
1834   /* open the file for update/writing */
1835   queue->temp_file = fdopen (fd, "wb+");
1836   /* error creating file */
1837   if (queue->temp_file == NULL)
1838     goto open_failed;
1839
1840   g_free (queue->temp_location);
1841   queue->temp_location = name;
1842
1843   GST_QUEUE2_MUTEX_UNLOCK (queue);
1844
1845   /* we can't emit the notify with the lock */
1846   g_object_notify (G_OBJECT (queue), "temp-location");
1847
1848   GST_QUEUE2_MUTEX_LOCK (queue);
1849
1850   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1851
1852   return TRUE;
1853
1854   /* ERRORS */
1855 already_opened:
1856   {
1857     GST_DEBUG_OBJECT (queue, "temp file was already open");
1858     return TRUE;
1859   }
1860 no_directory:
1861   {
1862     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1863         (_("No Temp directory specified.")), (NULL));
1864     return FALSE;
1865   }
1866 mkstemp_failed:
1867   {
1868     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1869         (_("Could not create temp file \"%s\"."), queue->temp_template),
1870         GST_ERROR_SYSTEM);
1871     g_free (name);
1872     return FALSE;
1873   }
1874 open_failed:
1875   {
1876     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1877         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1878     g_free (name);
1879     if (fd != -1)
1880       close (fd);
1881     return FALSE;
1882   }
1883 }
1884
1885 static void
1886 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1887 {
1888   /* nothing to do */
1889   if (queue->temp_file == NULL)
1890     return;
1891
1892   GST_DEBUG_OBJECT (queue, "closing temp file");
1893
1894   fflush (queue->temp_file);
1895   fclose (queue->temp_file);
1896
1897   if (queue->temp_remove) {
1898     if (remove (queue->temp_location) < 0) {
1899       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1900           queue->temp_location, g_strerror (errno));
1901     }
1902   }
1903
1904   queue->temp_file = NULL;
1905   clean_ranges (queue);
1906 }
1907
1908 static void
1909 gst_queue2_flush_temp_file (GstQueue2 * queue)
1910 {
1911   if (queue->temp_file == NULL)
1912     return;
1913
1914   GST_DEBUG_OBJECT (queue, "flushing temp file");
1915
1916   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1917 }
1918
1919 static void
1920 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1921 {
1922   if (!QUEUE_IS_USING_QUEUE (queue)) {
1923     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1924       gst_queue2_flush_temp_file (queue);
1925     init_ranges (queue);
1926   } else {
1927     while (!g_queue_is_empty (&queue->queue)) {
1928       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1929
1930       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1931           && GST_EVENT_IS_STICKY (qitem->item)
1932           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1933           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1934         gst_pad_store_sticky_event (queue->srcpad,
1935             GST_EVENT_CAST (qitem->item));
1936       }
1937
1938       /* Then lose another reference because we are supposed to destroy that
1939          data when flushing */
1940       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1941         gst_mini_object_unref (qitem->item);
1942       g_slice_free (GstQueue2Item, qitem);
1943     }
1944   }
1945   queue->last_query = FALSE;
1946   g_cond_signal (&queue->query_handled);
1947   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1948   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1949   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1950   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1951   queue->sink_tainted = queue->src_tainted = TRUE;
1952   if (queue->starting_segment != NULL)
1953     gst_event_unref (queue->starting_segment);
1954   queue->starting_segment = NULL;
1955   queue->segment_event_received = FALSE;
1956   gst_event_replace (&queue->stream_start_event, NULL);
1957
1958   /* we deleted a lot of something */
1959   GST_QUEUE2_SIGNAL_DEL (queue);
1960 }
1961
1962 static gboolean
1963 gst_queue2_wait_free_space (GstQueue2 * queue)
1964 {
1965   /* We make space available if we're "full" according to whatever
1966    * the user defined as "full". */
1967   if (gst_queue2_is_filled (queue)) {
1968     gboolean started;
1969
1970     /* pause the timer while we wait. The fact that we are waiting does not mean
1971      * the byterate on the input pad is lower */
1972     if ((started = queue->in_timer_started))
1973       g_timer_stop (queue->in_timer);
1974
1975     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1976         "queue is full, waiting for free space");
1977     do {
1978       /* Wait for space to be available, we could be unlocked because of a flush. */
1979       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1980     }
1981     while (gst_queue2_is_filled (queue));
1982
1983     /* and continue if we were running before */
1984     if (started)
1985       g_timer_continue (queue->in_timer);
1986   }
1987   return TRUE;
1988
1989   /* ERRORS */
1990 out_flushing:
1991   {
1992     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1993     return FALSE;
1994   }
1995 }
1996
1997 static gboolean
1998 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1999 {
2000   GstMapInfo info;
2001   guint8 *data, *ring_buffer;
2002   guint size, rb_size;
2003   guint64 writing_pos, new_writing_pos;
2004   GstQueue2Range *range, *prev, *next;
2005   gboolean do_seek = FALSE;
2006
2007   if (QUEUE_IS_USING_RING_BUFFER (queue))
2008     writing_pos = queue->current->rb_writing_pos;
2009   else
2010     writing_pos = queue->current->writing_pos;
2011   ring_buffer = queue->ring_buffer;
2012   rb_size = queue->ring_buffer_max_size;
2013
2014   if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
2015     goto buffer_read_error;
2016
2017   size = info.size;
2018   data = info.data;
2019
2020   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
2021       writing_pos);
2022
2023   /* sanity check */
2024   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
2025       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
2026     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
2027         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
2028         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
2029   }
2030
2031   while (size > 0) {
2032     guint to_write;
2033
2034     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2035       gint64 space;
2036
2037       /* calculate the space in the ring buffer not used by data from
2038        * the current range */
2039       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
2040         /* wait until there is some free space */
2041         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
2042       }
2043       /* get the amount of space we have */
2044       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
2045
2046       /* calculate if we need to split or if we can write the entire
2047        * buffer now */
2048       to_write = MIN (size, space);
2049
2050       /* the writing position in the ring buffer after writing (part
2051        * or all of) the buffer */
2052       new_writing_pos = (writing_pos + to_write) % rb_size;
2053
2054       prev = NULL;
2055       range = queue->ranges;
2056
2057       /* if we need to overwrite data in the ring buffer, we need to
2058        * update the ranges
2059        *
2060        * warning: this code is complicated and includes some
2061        * simplifications - pen, paper and diagrams for the cases
2062        * recommended! */
2063       while (range) {
2064         guint64 range_data_start, range_data_end;
2065         GstQueue2Range *range_to_destroy = NULL;
2066
2067         if (range == queue->current)
2068           goto next_range;
2069
2070         range_data_start = range->rb_offset;
2071         range_data_end = range->rb_writing_pos;
2072
2073         /* handle the special case where the range has no data in it */
2074         if (range->writing_pos == range->offset) {
2075           if (range != queue->current) {
2076             GST_DEBUG_OBJECT (queue,
2077                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2078                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
2079             /* remove range */
2080             range_to_destroy = range;
2081             if (prev)
2082               prev->next = range->next;
2083           }
2084           goto next_range;
2085         }
2086
2087         if (range_data_end > range_data_start) {
2088           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
2089             goto next_range;
2090
2091           if (new_writing_pos > range_data_start) {
2092             if (new_writing_pos >= range_data_end) {
2093               GST_DEBUG_OBJECT (queue,
2094                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2095                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
2096               /* remove range */
2097               range_to_destroy = range;
2098               if (prev)
2099                 prev->next = range->next;
2100             } else {
2101               GST_DEBUG_OBJECT (queue,
2102                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
2103                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
2104                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
2105                   range->offset + new_writing_pos - range_data_start,
2106                   new_writing_pos);
2107               range->offset += (new_writing_pos - range_data_start);
2108               range->rb_offset = new_writing_pos;
2109             }
2110           }
2111         } else {
2112           guint64 new_wpos_virt = writing_pos + to_write;
2113
2114           if (new_wpos_virt <= range_data_start)
2115             goto next_range;
2116
2117           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
2118             GST_DEBUG_OBJECT (queue,
2119                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2120                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
2121             /* remove range */
2122             range_to_destroy = range;
2123             if (prev)
2124               prev->next = range->next;
2125           } else {
2126             GST_DEBUG_OBJECT (queue,
2127                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
2128                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
2129                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
2130                 range->offset + new_writing_pos - range_data_start,
2131                 new_writing_pos);
2132             range->offset += (new_wpos_virt - range_data_start);
2133             range->rb_offset = new_writing_pos;
2134           }
2135         }
2136
2137       next_range:
2138         if (!range_to_destroy)
2139           prev = range;
2140
2141         range = range->next;
2142         if (range_to_destroy) {
2143           if (range_to_destroy == queue->ranges)
2144             queue->ranges = range;
2145           g_slice_free (GstQueue2Range, range_to_destroy);
2146           range_to_destroy = NULL;
2147         }
2148       }
2149     } else {
2150       to_write = size;
2151       new_writing_pos = writing_pos + to_write;
2152     }
2153
2154     if (QUEUE_IS_USING_TEMP_FILE (queue)
2155         && FSEEK_FILE (queue->temp_file, writing_pos))
2156       goto seek_failed;
2157
2158     if (new_writing_pos > writing_pos) {
2159       GST_INFO_OBJECT (queue,
2160           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
2161           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
2162           queue->current->writing_pos, queue->current->rb_writing_pos);
2163       /* either not using ring buffer or no wrapping, just write */
2164       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2165         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
2166           goto handle_error;
2167       } else {
2168         memcpy (ring_buffer + writing_pos, data, to_write);
2169       }
2170
2171       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
2172         /* try to merge with next range */
2173         while ((next = queue->current->next)) {
2174           GST_INFO_OBJECT (queue,
2175               "checking merge with next range %" G_GUINT64_FORMAT " < %"
2176               G_GUINT64_FORMAT, new_writing_pos, next->offset);
2177           if (new_writing_pos < next->offset)
2178             break;
2179
2180           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
2181               next->writing_pos);
2182
2183           /* remove the group */
2184           queue->current->next = next->next;
2185
2186           /* We use the threshold to decide if we want to do a seek or simply
2187            * read the data again. If there is not so much data in the range we
2188            * prefer to avoid to seek and read it again. */
2189           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
2190             /* the new range had more data than the threshold, it's worth keeping
2191              * it and doing a seek. */
2192             new_writing_pos = next->writing_pos;
2193             do_seek = TRUE;
2194           }
2195           g_slice_free (GstQueue2Range, next);
2196         }
2197         goto update_and_signal;
2198       }
2199     } else {
2200       /* wrapping */
2201       guint block_one, block_two;
2202
2203       block_one = rb_size - writing_pos;
2204       block_two = to_write - block_one;
2205
2206       if (block_one > 0) {
2207         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2208         /* write data to end of ring buffer */
2209         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2210           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2211             goto handle_error;
2212         } else {
2213           memcpy (ring_buffer + writing_pos, data, block_one);
2214         }
2215       }
2216
2217       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2218         goto seek_failed;
2219
2220       if (block_two > 0) {
2221         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2222         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2223           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2224             goto handle_error;
2225         } else {
2226           memcpy (ring_buffer, data + block_one, block_two);
2227         }
2228       }
2229     }
2230
2231   update_and_signal:
2232     /* update the writing positions */
2233     size -= to_write;
2234     GST_INFO_OBJECT (queue,
2235         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2236         to_write, writing_pos, size);
2237
2238     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2239       data += to_write;
2240       queue->current->writing_pos += to_write;
2241       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2242     } else {
2243       queue->current->writing_pos = writing_pos = new_writing_pos;
2244 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
2245       if (do_seek) {
2246         if (queue->upstream_size == 0 || new_writing_pos < queue->upstream_size)
2247           perform_seek_to_offset (queue, new_writing_pos);
2248         else
2249           queue->is_eos = TRUE;
2250       }
2251 #endif
2252     }
2253
2254 #ifndef TIZEN_FEATURE_QUEUE2_MODIFICATION
2255     if (do_seek)
2256       perform_seek_to_offset (queue, new_writing_pos);
2257 #endif
2258
2259     update_cur_level (queue, queue->current);
2260
2261     /* update the buffering status */
2262     if (queue->use_buffering) {
2263       GstMessage *msg;
2264       update_buffering (queue);
2265       msg = gst_queue2_get_buffering_message (queue);
2266       if (msg) {
2267         GST_QUEUE2_MUTEX_UNLOCK (queue);
2268         g_mutex_lock (&queue->buffering_post_lock);
2269         gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2270         g_mutex_unlock (&queue->buffering_post_lock);
2271         GST_QUEUE2_MUTEX_LOCK (queue);
2272       }
2273     }
2274
2275     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2276         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2277
2278     GST_QUEUE2_SIGNAL_ADD (queue);
2279   }
2280
2281   gst_buffer_unmap (buffer, &info);
2282
2283   return TRUE;
2284
2285   /* ERRORS */
2286 out_flushing:
2287   {
2288     GST_DEBUG_OBJECT (queue, "we are flushing");
2289     gst_buffer_unmap (buffer, &info);
2290     /* FIXME - GST_FLOW_EOS ? */
2291     return FALSE;
2292   }
2293 seek_failed:
2294   {
2295     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2296     gst_buffer_unmap (buffer, &info);
2297     return FALSE;
2298   }
2299 handle_error:
2300   {
2301     switch (errno) {
2302       case ENOSPC:{
2303         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2304         break;
2305       }
2306       default:{
2307         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2308             (_("Error while writing to download file.")),
2309             ("%s", g_strerror (errno)));
2310       }
2311     }
2312     gst_buffer_unmap (buffer, &info);
2313     return FALSE;
2314   }
2315 buffer_read_error:
2316   {
2317     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2318         ("Can't read from buffer"));
2319     return FALSE;
2320   }
2321 }
2322
2323 static gboolean
2324 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2325 {
2326   GstQueue2 *queue = q;
2327
2328   GST_TRACE_OBJECT (queue,
2329       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2330       gst_buffer_get_size (*buf));
2331
2332   if (!gst_queue2_create_write (queue, *buf)) {
2333     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2334     return FALSE;
2335   }
2336   return TRUE;
2337 }
2338
2339 static gboolean
2340 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2341 {
2342   guint *p_size = data;
2343   gsize buf_size;
2344
2345   buf_size = gst_buffer_get_size (*buf);
2346   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2347   *p_size += buf_size;
2348   return TRUE;
2349 }
2350
2351 /* enqueue an item an update the level stats */
2352 static void
2353 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2354     GstQueue2ItemType item_type)
2355 {
2356   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2357     GstBuffer *buffer;
2358     guint size;
2359
2360     buffer = GST_BUFFER_CAST (item);
2361     size = gst_buffer_get_size (buffer);
2362
2363     /* add buffer to the statistics */
2364     if (QUEUE_IS_USING_QUEUE (queue)) {
2365       queue->cur_level.buffers++;
2366       queue->cur_level.bytes += size;
2367     }
2368     queue->bytes_in += size;
2369
2370     /* apply new buffer to segment stats */
2371     apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2372     /* update the byterate stats */
2373     update_in_rates (queue, FALSE);
2374
2375     if (!QUEUE_IS_USING_QUEUE (queue)) {
2376       /* FIXME - check return value? */
2377       gst_queue2_create_write (queue, buffer);
2378     }
2379   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2380     GstBufferList *buffer_list;
2381     guint size = 0;
2382
2383     buffer_list = GST_BUFFER_LIST_CAST (item);
2384
2385     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2386     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2387
2388     /* add buffer to the statistics */
2389     if (QUEUE_IS_USING_QUEUE (queue)) {
2390       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2391       queue->cur_level.bytes += size;
2392     }
2393     queue->bytes_in += size;
2394
2395     /* apply new buffer to segment stats */
2396     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2397
2398     /* update the byterate stats */
2399     update_in_rates (queue, FALSE);
2400
2401     if (!QUEUE_IS_USING_QUEUE (queue)) {
2402       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2403     }
2404   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2405     GstEvent *event;
2406
2407     event = GST_EVENT_CAST (item);
2408
2409     switch (GST_EVENT_TYPE (event)) {
2410       case GST_EVENT_EOS:
2411         /* Zero the thresholds, this makes sure the queue is completely
2412          * filled and we can read all data from the queue. */
2413         GST_DEBUG_OBJECT (queue, "we have EOS");
2414         queue->is_eos = TRUE;
2415         /* Force updating the input bitrate */
2416         update_in_rates (queue, TRUE);
2417         break;
2418       case GST_EVENT_SEGMENT:
2419         apply_segment (queue, event, &queue->sink_segment, TRUE);
2420         /* This is our first new segment, we hold it
2421          * as we can't save it on the temp file */
2422         if (!QUEUE_IS_USING_QUEUE (queue)) {
2423           if (queue->segment_event_received)
2424             goto unexpected_event;
2425
2426           queue->segment_event_received = TRUE;
2427           if (queue->starting_segment != NULL)
2428             gst_event_unref (queue->starting_segment);
2429           queue->starting_segment = event;
2430           item = NULL;
2431         }
2432         /* a new segment allows us to accept more buffers if we got EOS
2433          * from downstream */
2434         queue->unexpected = FALSE;
2435         break;
2436       case GST_EVENT_GAP:
2437         apply_gap (queue, event, &queue->sink_segment, TRUE);
2438         break;
2439       case GST_EVENT_STREAM_START:
2440         if (!QUEUE_IS_USING_QUEUE (queue)) {
2441           gst_event_replace (&queue->stream_start_event, event);
2442           gst_event_unref (event);
2443           item = NULL;
2444         }
2445         break;
2446       case GST_EVENT_CAPS:{
2447         GstCaps *caps;
2448
2449         gst_event_parse_caps (event, &caps);
2450         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2451
2452         if (!QUEUE_IS_USING_QUEUE (queue)) {
2453           GST_LOG ("Dropping caps event, not using queue");
2454           gst_event_unref (event);
2455           item = NULL;
2456         }
2457         break;
2458       }
2459       default:
2460         if (!QUEUE_IS_USING_QUEUE (queue))
2461           goto unexpected_event;
2462         break;
2463     }
2464   } else if (GST_IS_QUERY (item)) {
2465     /* Can't happen as we check that in the caller */
2466     if (!QUEUE_IS_USING_QUEUE (queue))
2467       g_assert_not_reached ();
2468   } else {
2469     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2470         item, GST_OBJECT_NAME (queue));
2471     /* we can't really unref since we don't know what it is */
2472     item = NULL;
2473   }
2474
2475   if (item) {
2476     /* update the buffering status */
2477     if (queue->use_buffering)
2478       update_buffering (queue);
2479
2480     if (QUEUE_IS_USING_QUEUE (queue)) {
2481       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2482       qitem->type = item_type;
2483       qitem->item = item;
2484       g_queue_push_tail (&queue->queue, qitem);
2485     } else {
2486       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2487     }
2488
2489     GST_QUEUE2_SIGNAL_ADD (queue);
2490   }
2491
2492   return;
2493
2494   /* ERRORS */
2495 unexpected_event:
2496   {
2497     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2498
2499     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2500         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2501         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2502     gst_event_unref (GST_EVENT_CAST (item));
2503     return;
2504   }
2505 }
2506
2507 /* dequeue an item from the queue and update level stats */
2508 static GstMiniObject *
2509 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2510 {
2511   GstMiniObject *item;
2512
2513   if (!QUEUE_IS_USING_QUEUE (queue)) {
2514     item = gst_queue2_read_item_from_file (queue);
2515   } else {
2516     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2517
2518     if (qitem == NULL)
2519       goto no_item;
2520
2521     item = qitem->item;
2522     g_slice_free (GstQueue2Item, qitem);
2523   }
2524
2525   if (item == NULL)
2526     goto no_item;
2527
2528   if (GST_IS_BUFFER (item)) {
2529     GstBuffer *buffer;
2530     guint size;
2531
2532     buffer = GST_BUFFER_CAST (item);
2533     size = gst_buffer_get_size (buffer);
2534     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2535
2536     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2537         "retrieved buffer %p from queue", buffer);
2538
2539     if (QUEUE_IS_USING_QUEUE (queue)) {
2540       queue->cur_level.buffers--;
2541       queue->cur_level.bytes -= size;
2542     }
2543     queue->bytes_out += size;
2544
2545     apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2546     /* update the byterate stats */
2547     update_out_rates (queue);
2548     /* update the buffering */
2549     if (queue->use_buffering)
2550       update_buffering (queue);
2551
2552   } else if (GST_IS_EVENT (item)) {
2553     GstEvent *event = GST_EVENT_CAST (item);
2554
2555     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2556
2557     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2558         "retrieved event %p from queue", event);
2559
2560     switch (GST_EVENT_TYPE (event)) {
2561       case GST_EVENT_EOS:
2562         /* queue is empty now that we dequeued the EOS */
2563         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2564         break;
2565       case GST_EVENT_SEGMENT:
2566         apply_segment (queue, event, &queue->src_segment, FALSE);
2567         break;
2568       case GST_EVENT_GAP:
2569         apply_gap (queue, event, &queue->src_segment, FALSE);
2570         break;
2571       default:
2572         break;
2573     }
2574   } else if (GST_IS_BUFFER_LIST (item)) {
2575     GstBufferList *buffer_list;
2576     guint size = 0;
2577
2578     buffer_list = GST_BUFFER_LIST_CAST (item);
2579     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2580     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2581
2582     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2583         "retrieved buffer list %p from queue", buffer_list);
2584
2585     if (QUEUE_IS_USING_QUEUE (queue)) {
2586       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2587       queue->cur_level.bytes -= size;
2588     }
2589     queue->bytes_out += size;
2590
2591     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2592     /* update the byterate stats */
2593     update_out_rates (queue);
2594     /* update the buffering */
2595     if (queue->use_buffering)
2596       update_buffering (queue);
2597   } else if (GST_IS_QUERY (item)) {
2598     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2599         "retrieved query %p from queue", item);
2600     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2601   } else {
2602     g_warning
2603         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2604         item, GST_OBJECT_NAME (queue));
2605     item = NULL;
2606     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2607   }
2608   GST_QUEUE2_SIGNAL_DEL (queue);
2609
2610   return item;
2611
2612   /* ERRORS */
2613 no_item:
2614   {
2615     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2616     return NULL;
2617   }
2618 }
2619
2620 static GstFlowReturn
2621 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2622     GstEvent * event)
2623 {
2624   gboolean ret = TRUE;
2625   GstQueue2 *queue;
2626
2627   queue = GST_QUEUE2 (parent);
2628
2629   switch (GST_EVENT_TYPE (event)) {
2630     case GST_EVENT_FLUSH_START:
2631     {
2632       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2633       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2634         /* forward event */
2635         ret = gst_pad_push_event (queue->srcpad, event);
2636
2637         /* now unblock the chain function */
2638         GST_QUEUE2_MUTEX_LOCK (queue);
2639         queue->srcresult = GST_FLOW_FLUSHING;
2640         queue->sinkresult = GST_FLOW_FLUSHING;
2641         /* unblock the loop and chain functions */
2642         GST_QUEUE2_SIGNAL_ADD (queue);
2643         GST_QUEUE2_SIGNAL_DEL (queue);
2644         GST_QUEUE2_MUTEX_UNLOCK (queue);
2645
2646         /* make sure it pauses, this should happen since we sent
2647          * flush_start downstream. */
2648         gst_pad_pause_task (queue->srcpad);
2649         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2650
2651         GST_QUEUE2_MUTEX_LOCK (queue);
2652         queue->last_query = FALSE;
2653         g_cond_signal (&queue->query_handled);
2654         GST_QUEUE2_MUTEX_UNLOCK (queue);
2655       } else {
2656         GST_QUEUE2_MUTEX_LOCK (queue);
2657         /* flush the sink pad */
2658         queue->sinkresult = GST_FLOW_FLUSHING;
2659         GST_QUEUE2_SIGNAL_DEL (queue);
2660         queue->last_query = FALSE;
2661         g_cond_signal (&queue->query_handled);
2662         GST_QUEUE2_MUTEX_UNLOCK (queue);
2663
2664         gst_event_unref (event);
2665       }
2666       break;
2667     }
2668     case GST_EVENT_FLUSH_STOP:
2669     {
2670       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2671
2672       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2673         /* forward event */
2674         ret = gst_pad_push_event (queue->srcpad, event);
2675
2676         GST_QUEUE2_MUTEX_LOCK (queue);
2677         gst_queue2_locked_flush (queue, FALSE, TRUE);
2678         queue->srcresult = GST_FLOW_OK;
2679         queue->sinkresult = GST_FLOW_OK;
2680         queue->is_eos = FALSE;
2681         queue->unexpected = FALSE;
2682         queue->seeking = FALSE;
2683         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2684         /* reset rate counters */
2685         reset_rate_timer (queue);
2686         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2687             queue->srcpad, NULL);
2688         GST_QUEUE2_MUTEX_UNLOCK (queue);
2689       } else {
2690         GST_QUEUE2_MUTEX_LOCK (queue);
2691         queue->segment_event_received = FALSE;
2692         queue->is_eos = FALSE;
2693         queue->unexpected = FALSE;
2694         queue->sinkresult = GST_FLOW_OK;
2695         queue->seeking = FALSE;
2696         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2697         GST_QUEUE2_MUTEX_UNLOCK (queue);
2698
2699         gst_event_unref (event);
2700       }
2701       break;
2702     }
2703     case GST_EVENT_TAG:{
2704       if (queue->use_tags_bitrate) {
2705         GstTagList *tags;
2706         guint bitrate;
2707
2708         gst_event_parse_tag (event, &tags);
2709         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2710             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2711           GST_QUEUE2_MUTEX_LOCK (queue);
2712           queue->sink_tags_bitrate = bitrate;
2713           GST_QUEUE2_MUTEX_UNLOCK (queue);
2714           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2715         }
2716       }
2717       /* Fall-through */
2718     }
2719     default:
2720       if (GST_EVENT_IS_SERIALIZED (event)) {
2721         /* serialized events go in the queue */
2722         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2723         if (queue->srcresult != GST_FLOW_OK) {
2724           /* Errors in sticky event pushing are no problem and ignored here
2725            * as they will cause more meaningful errors during data flow.
2726            * For EOS events, that are not followed by data flow, we still
2727            * return FALSE here though and report an error.
2728            */
2729           if (!GST_EVENT_IS_STICKY (event)) {
2730             goto out_flow_error;
2731           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2732             if (queue->srcresult == GST_FLOW_NOT_LINKED
2733                 || queue->srcresult < GST_FLOW_EOS) {
2734               GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
2735             }
2736             goto out_flow_error;
2737           }
2738         }
2739         /* refuse more events on EOS */
2740         if (queue->is_eos)
2741           goto out_eos;
2742         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2743         GST_QUEUE2_MUTEX_UNLOCK (queue);
2744         gst_queue2_post_buffering (queue);
2745       } else {
2746         /* non-serialized events are passed downstream. */
2747         ret = gst_pad_push_event (queue->srcpad, event);
2748       }
2749       break;
2750   }
2751   if (ret == FALSE)
2752     return GST_FLOW_ERROR;
2753   return GST_FLOW_OK;
2754
2755   /* ERRORS */
2756 out_flushing:
2757   {
2758     GstFlowReturn ret = queue->sinkresult;
2759     GST_DEBUG_OBJECT (queue, "refusing event, we are %s",
2760         gst_flow_get_name (ret));
2761     GST_QUEUE2_MUTEX_UNLOCK (queue);
2762     gst_event_unref (event);
2763     return ret;
2764   }
2765 out_eos:
2766   {
2767     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2768     GST_QUEUE2_MUTEX_UNLOCK (queue);
2769     gst_event_unref (event);
2770     return GST_FLOW_EOS;
2771   }
2772 out_flow_error:
2773   {
2774     GST_LOG_OBJECT (queue,
2775         "refusing event, we have a downstream flow error: %s",
2776         gst_flow_get_name (queue->srcresult));
2777     GST_QUEUE2_MUTEX_UNLOCK (queue);
2778     gst_event_unref (event);
2779     return queue->srcresult;
2780   }
2781 }
2782
2783 static gboolean
2784 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2785     GstQuery * query)
2786 {
2787   GstQueue2 *queue;
2788   gboolean res;
2789
2790   queue = GST_QUEUE2 (parent);
2791
2792   switch (GST_QUERY_TYPE (query)) {
2793     default:
2794       if (GST_QUERY_IS_SERIALIZED (query)) {
2795         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2796         /* serialized events go in the queue. We need to be certain that we
2797          * don't cause deadlocks waiting for the query return value. We check if
2798          * the queue is empty (nothing is blocking downstream and the query can
2799          * be pushed for sure) or we are not buffering. If we are buffering,
2800          * the pipeline waits to unblock downstream until our queue fills up
2801          * completely, which can not happen if we block on the query..
2802          * Therefore we only potentially block when we are not buffering. */
2803         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2804         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2805                 || !queue->use_buffering)) {
2806           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2807             gst_queue2_locked_enqueue (queue, query,
2808                 GST_QUEUE2_ITEM_TYPE_QUERY);
2809
2810             STATUS (queue, queue->sinkpad, "wait for QUERY");
2811             while (queue->sinkresult == GST_FLOW_OK &&
2812                 queue->last_handled_query != query)
2813               g_cond_wait (&queue->query_handled, &queue->qlock);
2814             queue->last_handled_query = NULL;
2815             if (queue->sinkresult != GST_FLOW_OK)
2816               goto out_flushing;
2817             res = queue->last_query;
2818           } else {
2819             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2820             res = FALSE;
2821           }
2822         } else {
2823           GST_DEBUG_OBJECT (queue,
2824               "refusing query, we are not using the queue");
2825           res = FALSE;
2826         }
2827         GST_QUEUE2_MUTEX_UNLOCK (queue);
2828         gst_queue2_post_buffering (queue);
2829       } else {
2830         res = gst_pad_query_default (pad, parent, query);
2831       }
2832       break;
2833   }
2834   return res;
2835
2836   /* ERRORS */
2837 out_flushing:
2838   {
2839     GST_DEBUG_OBJECT (queue, "refusing query, we are %s",
2840         gst_flow_get_name (queue->sinkresult));
2841     GST_QUEUE2_MUTEX_UNLOCK (queue);
2842     return FALSE;
2843   }
2844 }
2845
2846 static gboolean
2847 gst_queue2_is_empty (GstQueue2 * queue)
2848 {
2849   /* never empty on EOS */
2850   if (queue->is_eos)
2851     return FALSE;
2852
2853   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2854     return queue->current->writing_pos <= queue->current->max_reading_pos;
2855   } else {
2856     if (queue->queue.length == 0)
2857       return TRUE;
2858   }
2859
2860   return FALSE;
2861 }
2862
2863 static gboolean
2864 gst_queue2_is_filled (GstQueue2 * queue)
2865 {
2866   gboolean res;
2867
2868   /* always filled on EOS */
2869   if (queue->is_eos)
2870     return TRUE;
2871
2872 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2873     (queue->cur_level.format) >= ((alt_max) ? \
2874       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2875
2876   /* if using a ring buffer we're filled if all ring buffer space is used
2877    * _by the current range_ */
2878   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2879     guint64 rb_size = queue->ring_buffer_max_size;
2880     GST_DEBUG_OBJECT (queue,
2881         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2882         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2883     return CHECK_FILLED (bytes, rb_size);
2884   }
2885
2886   /* if using file, we're never filled if we don't have EOS */
2887   if (QUEUE_IS_USING_TEMP_FILE (queue))
2888     return FALSE;
2889
2890   /* we are never filled when we have no buffers at all */
2891   if (queue->cur_level.buffers == 0)
2892     return FALSE;
2893
2894   /* we are filled if one of the current levels exceeds the max */
2895   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2896       || CHECK_FILLED (time, 0);
2897
2898   /* if we need to, use the rate estimate to check against the max time we are
2899    * allowed to queue */
2900   if (queue->use_rate_estimate)
2901     res |= CHECK_FILLED (rate_time, 0);
2902
2903 #undef CHECK_FILLED
2904   return res;
2905 }
2906
2907 static GstFlowReturn
2908 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2909     GstMiniObject * item, GstQueue2ItemType item_type)
2910 {
2911   /* we have to lock the queue since we span threads */
2912   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2913   /* when we received EOS, we refuse more data */
2914   if (queue->is_eos)
2915     goto out_eos;
2916   /* when we received unexpected from downstream, refuse more buffers */
2917   if (queue->unexpected)
2918     goto out_unexpected;
2919
2920   /* while we didn't receive the newsegment, we're seeking and we skip data */
2921   if (queue->seeking)
2922     goto out_seeking;
2923
2924   if (!gst_queue2_wait_free_space (queue))
2925     goto out_flushing;
2926
2927   /* put buffer in queue now */
2928   gst_queue2_locked_enqueue (queue, item, item_type);
2929   GST_QUEUE2_MUTEX_UNLOCK (queue);
2930   gst_queue2_post_buffering (queue);
2931
2932   return GST_FLOW_OK;
2933
2934   /* special conditions */
2935 out_flushing:
2936   {
2937     GstFlowReturn ret = queue->sinkresult;
2938
2939     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2940         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2941     GST_QUEUE2_MUTEX_UNLOCK (queue);
2942     gst_mini_object_unref (item);
2943
2944     return ret;
2945   }
2946 out_eos:
2947   {
2948     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2949     GST_QUEUE2_MUTEX_UNLOCK (queue);
2950     gst_mini_object_unref (item);
2951
2952     return GST_FLOW_EOS;
2953   }
2954 out_seeking:
2955   {
2956     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2957     GST_QUEUE2_MUTEX_UNLOCK (queue);
2958     gst_mini_object_unref (item);
2959
2960     return GST_FLOW_OK;
2961   }
2962 out_unexpected:
2963   {
2964     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2965     GST_QUEUE2_MUTEX_UNLOCK (queue);
2966     gst_mini_object_unref (item);
2967
2968     return GST_FLOW_EOS;
2969   }
2970 }
2971
2972 static GstFlowReturn
2973 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2974 {
2975   GstQueue2 *queue;
2976
2977   queue = GST_QUEUE2 (parent);
2978
2979   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2980       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2981       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2982       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2983       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2984
2985   return gst_queue2_chain_buffer_or_buffer_list (queue,
2986       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2987 }
2988
2989 static GstFlowReturn
2990 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2991     GstBufferList * buffer_list)
2992 {
2993   GstQueue2 *queue;
2994
2995   queue = GST_QUEUE2 (parent);
2996
2997   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2998       "received buffer list %p", buffer_list);
2999
3000   return gst_queue2_chain_buffer_or_buffer_list (queue,
3001       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
3002 }
3003
3004 static GstMiniObject *
3005 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
3006 {
3007   GstMiniObject *data;
3008
3009   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
3010
3011   /* stop pushing buffers, we dequeue all items until we see an item that we
3012    * can push again, which is EOS or SEGMENT. If there is nothing in the
3013    * queue we can push, we set a flag to make the sinkpad refuse more
3014    * buffers with an EOS return value until we receive something
3015    * pushable again or we get flushed. */
3016   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
3017     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
3018       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3019           "dropping EOS buffer %p", data);
3020       gst_buffer_unref (GST_BUFFER_CAST (data));
3021     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
3022       GstEvent *event = GST_EVENT_CAST (data);
3023       GstEventType type = GST_EVENT_TYPE (event);
3024
3025       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
3026         /* we found a pushable item in the queue, push it out */
3027         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3028             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
3029         return data;
3030       }
3031       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3032           "dropping EOS event %p", event);
3033       gst_event_unref (event);
3034     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
3035       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3036           "dropping EOS buffer list %p", data);
3037       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
3038     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
3039       queue->last_query = FALSE;
3040       g_cond_signal (&queue->query_handled);
3041       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
3042     }
3043   }
3044   /* no more items in the queue. Set the unexpected flag so that upstream
3045    * make us refuse any more buffers on the sinkpad. Since we will still
3046    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
3047    * task function does not shut down. */
3048   queue->unexpected = TRUE;
3049   return NULL;
3050 }
3051
3052 /* dequeue an item from the queue an push it downstream. This functions returns
3053  * the result of the push. */
3054 static GstFlowReturn
3055 gst_queue2_push_one (GstQueue2 * queue)
3056 {
3057   GstFlowReturn result = queue->srcresult;
3058   GstMiniObject *data;
3059   GstQueue2ItemType item_type;
3060
3061   data = gst_queue2_locked_dequeue (queue, &item_type);
3062   if (data == NULL)
3063     goto no_item;
3064
3065 next:
3066   STATUS (queue, queue->srcpad, "We have something dequeud");
3067   g_atomic_int_set (&queue->downstream_may_block,
3068       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
3069       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
3070   GST_QUEUE2_MUTEX_UNLOCK (queue);
3071   gst_queue2_post_buffering (queue);
3072
3073   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
3074     GstBuffer *buffer;
3075
3076     buffer = GST_BUFFER_CAST (data);
3077
3078     result = gst_pad_push (queue->srcpad, buffer);
3079     g_atomic_int_set (&queue->downstream_may_block, 0);
3080
3081     /* need to check for srcresult here as well */
3082     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3083     if (result == GST_FLOW_EOS) {
3084       data = gst_queue2_dequeue_on_eos (queue, &item_type);
3085       if (data != NULL)
3086         goto next;
3087       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
3088        * to the caller so that the task function does not shut down */
3089       result = GST_FLOW_OK;
3090     }
3091   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
3092     GstEvent *event = GST_EVENT_CAST (data);
3093     GstEventType type = GST_EVENT_TYPE (event);
3094
3095     if (type == GST_EVENT_TAG) {
3096       if (queue->use_tags_bitrate) {
3097         GstTagList *tags;
3098         guint bitrate;
3099
3100         gst_event_parse_tag (event, &tags);
3101         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
3102             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
3103           GST_QUEUE2_MUTEX_LOCK (queue);
3104           queue->src_tags_bitrate = bitrate;
3105           GST_QUEUE2_MUTEX_UNLOCK (queue);
3106           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
3107         }
3108       }
3109     }
3110
3111     gst_pad_push_event (queue->srcpad, event);
3112
3113     /* if we're EOS, return EOS so that the task pauses. */
3114     if (type == GST_EVENT_EOS) {
3115       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3116           "pushed EOS event %p, return EOS", event);
3117       result = GST_FLOW_EOS;
3118     }
3119
3120     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3121   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
3122     GstBufferList *buffer_list;
3123
3124     buffer_list = GST_BUFFER_LIST_CAST (data);
3125
3126     result = gst_pad_push_list (queue->srcpad, buffer_list);
3127     g_atomic_int_set (&queue->downstream_may_block, 0);
3128
3129     /* need to check for srcresult here as well */
3130     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3131     if (result == GST_FLOW_EOS) {
3132       data = gst_queue2_dequeue_on_eos (queue, &item_type);
3133       if (data != NULL)
3134         goto next;
3135       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
3136        * to the caller so that the task function does not shut down */
3137       result = GST_FLOW_OK;
3138     }
3139   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
3140     GstQuery *query = GST_QUERY_CAST (data);
3141
3142     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
3143     queue->last_handled_query = query;
3144     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
3145     GST_LOG_OBJECT (queue->srcpad, "Peered query");
3146     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3147         "did query %p, return %d", query, queue->last_query);
3148     g_cond_signal (&queue->query_handled);
3149     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3150     result = GST_FLOW_OK;
3151   }
3152   return result;
3153
3154   /* ERRORS */
3155 no_item:
3156   {
3157     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3158         "exit because we have no item in the queue");
3159     return GST_FLOW_ERROR;
3160   }
3161 out_flushing:
3162   {
3163     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are %s",
3164         gst_flow_get_name (queue->srcresult));
3165     return queue->srcresult;
3166   }
3167 }
3168
3169 /* called repeatedly with @pad as the source pad. This function should push out
3170  * data to the peer element. */
3171 static void
3172 gst_queue2_loop (GstPad * pad)
3173 {
3174   GstQueue2 *queue;
3175 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
3176   GstFlowReturn ret = GST_FLOW_OK;
3177 #else
3178   GstFlowReturn ret;
3179 #endif
3180
3181   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
3182
3183   /* have to lock for thread-safety */
3184   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3185
3186   if (gst_queue2_is_empty (queue)) {
3187     gboolean started;
3188
3189     /* pause the timer while we wait. The fact that we are waiting does not mean
3190      * the byterate on the output pad is lower */
3191     if ((started = queue->out_timer_started))
3192       g_timer_stop (queue->out_timer);
3193
3194     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
3195         "queue is empty, waiting for new data");
3196     do {
3197       /* Wait for data to be available, we could be unlocked because of a flush. */
3198       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
3199     }
3200     while (gst_queue2_is_empty (queue));
3201
3202     /* and continue if we were running before */
3203     if (started)
3204       g_timer_continue (queue->out_timer);
3205   }
3206 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
3207   /* if buffering mode is GST_BUFFERING_LIVE, it is rtsp streaming */
3208   if (!((queue->mode == GST_BUFFERING_LIVE) && queue->is_buffering)) {
3209     ret = gst_queue2_push_one (queue);
3210   }
3211 #else
3212   ret = gst_queue2_push_one (queue);
3213 #endif
3214   queue->srcresult = ret;
3215   queue->sinkresult = ret;
3216   if (ret != GST_FLOW_OK)
3217     goto out_flushing;
3218
3219   GST_QUEUE2_MUTEX_UNLOCK (queue);
3220   gst_queue2_post_buffering (queue);
3221
3222   return;
3223
3224   /* ERRORS */
3225 out_flushing:
3226   {
3227     gboolean eos = queue->is_eos;
3228     GstFlowReturn ret = queue->srcresult;
3229
3230     gst_pad_pause_task (queue->srcpad);
3231     if (ret == GST_FLOW_FLUSHING) {
3232       gst_queue2_locked_flush (queue, FALSE, FALSE);
3233     } else {
3234       GST_QUEUE2_SIGNAL_DEL (queue);
3235       queue->last_query = FALSE;
3236       g_cond_signal (&queue->query_handled);
3237     }
3238     GST_QUEUE2_MUTEX_UNLOCK (queue);
3239     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3240         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
3241     /* let app know about us giving up if upstream is not expected to do so */
3242     /* EOS is already taken care of elsewhere */
3243     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3244       GST_ELEMENT_FLOW_ERROR (queue, ret);
3245       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3246     }
3247     return;
3248   }
3249 }
3250
3251 static gboolean
3252 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3253 {
3254   gboolean res = TRUE;
3255   GstQueue2 *queue = GST_QUEUE2 (parent);
3256
3257 #ifndef GST_DISABLE_GST_DEBUG
3258   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3259       event, GST_EVENT_TYPE_NAME (event));
3260 #endif
3261
3262   switch (GST_EVENT_TYPE (event)) {
3263     case GST_EVENT_FLUSH_START:
3264       if (QUEUE_IS_USING_QUEUE (queue)) {
3265         /* just forward upstream */
3266         res = gst_pad_push_event (queue->sinkpad, event);
3267       } else {
3268         /* now unblock the getrange function */
3269         GST_QUEUE2_MUTEX_LOCK (queue);
3270         GST_DEBUG_OBJECT (queue, "flushing");
3271         queue->srcresult = GST_FLOW_FLUSHING;
3272         GST_QUEUE2_SIGNAL_ADD (queue);
3273         GST_QUEUE2_MUTEX_UNLOCK (queue);
3274
3275         /* when using a temp file, we eat the event */
3276         res = TRUE;
3277         gst_event_unref (event);
3278       }
3279       break;
3280     case GST_EVENT_FLUSH_STOP:
3281       if (QUEUE_IS_USING_QUEUE (queue)) {
3282         /* just forward upstream */
3283         res = gst_pad_push_event (queue->sinkpad, event);
3284       } else {
3285         /* now unblock the getrange function */
3286         GST_QUEUE2_MUTEX_LOCK (queue);
3287         queue->srcresult = GST_FLOW_OK;
3288         GST_QUEUE2_MUTEX_UNLOCK (queue);
3289
3290         /* when using a temp file, we eat the event */
3291         res = TRUE;
3292         gst_event_unref (event);
3293       }
3294       break;
3295     case GST_EVENT_RECONFIGURE:
3296       GST_QUEUE2_MUTEX_LOCK (queue);
3297       /* assume downstream is linked now and try to push again */
3298       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3299         queue->srcresult = GST_FLOW_OK;
3300         queue->sinkresult = GST_FLOW_OK;
3301         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3302           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3303               NULL);
3304         }
3305       }
3306       GST_QUEUE2_MUTEX_UNLOCK (queue);
3307
3308       res = gst_pad_push_event (queue->sinkpad, event);
3309       break;
3310     default:
3311       res = gst_pad_push_event (queue->sinkpad, event);
3312       break;
3313   }
3314
3315   return res;
3316 }
3317
3318 static gboolean
3319 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3320 {
3321   GstQueue2 *queue;
3322
3323   queue = GST_QUEUE2 (parent);
3324
3325   switch (GST_QUERY_TYPE (query)) {
3326     case GST_QUERY_POSITION:
3327     {
3328       gint64 peer_pos;
3329       GstFormat format;
3330
3331       if (!gst_pad_peer_query (queue->sinkpad, query))
3332         goto peer_failed;
3333
3334       /* get peer position */
3335       gst_query_parse_position (query, &format, &peer_pos);
3336
3337       /* FIXME: this code assumes that there's no discont in the queue */
3338       switch (format) {
3339         case GST_FORMAT_BYTES:
3340           peer_pos -= queue->cur_level.bytes;
3341           if (peer_pos < 0)     /* Clamp result to 0 */
3342             peer_pos = 0;
3343           break;
3344         case GST_FORMAT_TIME:
3345           peer_pos -= queue->cur_level.time;
3346           if (peer_pos < 0)     /* Clamp result to 0 */
3347             peer_pos = 0;
3348           break;
3349         default:
3350           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3351               "know how to adjust value", gst_format_get_name (format));
3352           return FALSE;
3353       }
3354       /* set updated position */
3355       gst_query_set_position (query, format, peer_pos);
3356       break;
3357     }
3358     case GST_QUERY_DURATION:
3359     {
3360       GST_DEBUG_OBJECT (queue, "doing peer query");
3361
3362       if (!gst_pad_peer_query (queue->sinkpad, query))
3363         goto peer_failed;
3364
3365       GST_DEBUG_OBJECT (queue, "peer query success");
3366       break;
3367     }
3368     case GST_QUERY_BUFFERING:
3369     {
3370       gint percent;
3371       gboolean is_buffering;
3372       GstBufferingMode mode;
3373       gint avg_in, avg_out;
3374       gint64 buffering_left;
3375
3376       GST_DEBUG_OBJECT (queue, "query buffering");
3377
3378       get_buffering_level (queue, &is_buffering, &percent);
3379       percent = convert_to_buffering_percent (queue, percent);
3380       gst_query_set_buffering_percent (query, is_buffering, percent);
3381
3382       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3383           &buffering_left);
3384       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3385           buffering_left);
3386
3387       if (!QUEUE_IS_USING_QUEUE (queue)) {
3388         /* add ranges for download and ringbuffer buffering */
3389         GstFormat format;
3390         gint64 start, stop, range_start, range_stop;
3391         guint64 writing_pos;
3392         gint64 estimated_total;
3393         gint64 duration;
3394         gboolean peer_res, is_eos;
3395         GstQueue2Range *queued_ranges;
3396
3397         /* we need a current download region */
3398         if (queue->current == NULL)
3399           return FALSE;
3400
3401         writing_pos = queue->current->writing_pos;
3402         is_eos = queue->is_eos;
3403
3404         if (is_eos) {
3405           /* we're EOS, we know the duration in bytes now */
3406           peer_res = TRUE;
3407           duration = writing_pos;
3408         } else {
3409           /* get duration of upstream in bytes */
3410           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3411               GST_FORMAT_BYTES, &duration);
3412         }
3413
3414         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3415             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3416
3417         /* calculate remaining and total download time */
3418         if (peer_res && avg_in > 0.0)
3419           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3420         else
3421           estimated_total = -1;
3422
3423         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3424             estimated_total);
3425
3426         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3427
3428         switch (format) {
3429           case GST_FORMAT_PERCENT:
3430             /* we need duration */
3431             if (!peer_res)
3432               goto peer_failed;
3433
3434             start = 0;
3435             /* get our available data relative to the duration */
3436             if (duration != -1)
3437               stop =
3438                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3439                   duration);
3440             else
3441               stop = -1;
3442             break;
3443           case GST_FORMAT_BYTES:
3444             start = 0;
3445             stop = writing_pos;
3446             break;
3447           default:
3448             start = -1;
3449             stop = -1;
3450             break;
3451         }
3452
3453         /* fill out the buffered ranges */
3454         for (queued_ranges = queue->ranges; queued_ranges;
3455             queued_ranges = queued_ranges->next) {
3456           switch (format) {
3457             case GST_FORMAT_PERCENT:
3458               if (duration == -1) {
3459                 range_start = 0;
3460                 range_stop = 0;
3461                 break;
3462               }
3463 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3464               range_start =
3465                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3466                   queued_ranges->reading_pos, duration);
3467 #else
3468               range_start =
3469                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3470                   queued_ranges->offset, duration);
3471 #endif
3472               range_stop =
3473                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3474                   queued_ranges->writing_pos, duration);
3475               break;
3476             case GST_FORMAT_BYTES:
3477 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3478               range_start = queued_ranges->reading_pos;
3479 #else
3480               range_start = queued_ranges->offset;
3481 #endif
3482               range_stop = queued_ranges->writing_pos;
3483               break;
3484             default:
3485               range_start = -1;
3486               range_stop = -1;
3487               break;
3488           }
3489           if (range_start == range_stop)
3490             continue;
3491           GST_DEBUG_OBJECT (queue,
3492               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3493               G_GINT64_FORMAT, range_start, range_stop);
3494           gst_query_add_buffering_range (query, range_start, range_stop);
3495         }
3496
3497         gst_query_set_buffering_range (query, format, start, stop,
3498             estimated_total);
3499       }
3500       break;
3501     }
3502     case GST_QUERY_SCHEDULING:
3503     {
3504       gboolean pull_mode;
3505       GstSchedulingFlags flags = 0;
3506 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3507       if ((!QUEUE_IS_USING_QUEUE (queue)) && !gst_pad_peer_query (queue->sinkpad, query))
3508 #else
3509       if (!gst_pad_peer_query (queue->sinkpad, query))
3510 #endif
3511         goto peer_failed;
3512
3513       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3514
3515 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3516 #ifndef TIZEN_PROFILE_TV
3517       if (!(flags & GST_SCHEDULING_FLAG_SEEKABLE)) {
3518         GST_DEBUG_OBJECT(queue, "peer can support only push mode");
3519         gst_query_set_scheduling (query, flags, 0, -1, 0);
3520         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3521         break;
3522       }
3523 #endif
3524 #endif
3525       GST_DEBUG_OBJECT(queue, "peer can support pull mode");
3526
3527       /* we can operate in pull mode when we are using a tempfile */
3528       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3529
3530       if (pull_mode)
3531         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3532       gst_query_set_scheduling (query, flags, 0, -1, 0);
3533       if (pull_mode)
3534         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3535       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3536       break;
3537     }
3538     default:
3539       /* peer handled other queries */
3540       if (!gst_pad_query_default (pad, parent, query))
3541         goto peer_failed;
3542       break;
3543   }
3544
3545   return TRUE;
3546
3547   /* ERRORS */
3548 peer_failed:
3549   {
3550     GST_DEBUG_OBJECT (queue, "failed peer query");
3551     return FALSE;
3552   }
3553 }
3554
3555 static gboolean
3556 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3557 {
3558   GstQueue2 *queue = GST_QUEUE2 (element);
3559
3560   /* simply forward to the srcpad query function */
3561   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3562       query);
3563 }
3564
3565 static void
3566 gst_queue2_update_upstream_size (GstQueue2 * queue)
3567 {
3568   gint64 upstream_size = -1;
3569
3570   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3571           &upstream_size)) {
3572     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3573
3574     /* upstream_size can be negative but queue->upstream_size is unsigned.
3575      * Prevent setting negative values to it (the query can return -1) */
3576     if (upstream_size >= 0)
3577       queue->upstream_size = upstream_size;
3578     else
3579       queue->upstream_size = 0;
3580   }
3581 }
3582
3583 static GstFlowReturn
3584 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3585     guint length, GstBuffer ** buffer)
3586 {
3587   GstQueue2 *queue;
3588   GstFlowReturn ret;
3589
3590   queue = GST_QUEUE2_CAST (parent);
3591
3592   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3593   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3594   offset = (offset == -1) ? queue->current->reading_pos : offset;
3595
3596   GST_DEBUG_OBJECT (queue,
3597       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3598
3599   /* catch any reads beyond the size of the file here to make sure queue2
3600    * doesn't send seek events beyond the size of the file upstream, since
3601    * that would confuse elements such as souphttpsrc and/or http servers.
3602    * Demuxers often just loop until EOS at the end of the file to figure out
3603    * when they've read all the end-headers or index chunks. */
3604   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3605     gst_queue2_update_upstream_size (queue);
3606     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3607       goto out_unexpected;
3608   }
3609
3610   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3611     gst_queue2_update_upstream_size (queue);
3612     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3613       length = queue->upstream_size - offset;
3614       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3615     }
3616   }
3617
3618   /* FIXME - function will block when the range is not yet available */
3619   ret = gst_queue2_create_read (queue, offset, length, buffer);
3620   GST_QUEUE2_MUTEX_UNLOCK (queue);
3621   gst_queue2_post_buffering (queue);
3622
3623   return ret;
3624
3625   /* ERRORS */
3626 out_flushing:
3627   {
3628     ret = queue->srcresult;
3629
3630     GST_DEBUG_OBJECT (queue, "we are %s", gst_flow_get_name (ret));
3631     GST_QUEUE2_MUTEX_UNLOCK (queue);
3632     return ret;
3633   }
3634 out_unexpected:
3635   {
3636     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3637     GST_QUEUE2_MUTEX_UNLOCK (queue);
3638     return GST_FLOW_EOS;
3639   }
3640 }
3641
3642 /* sink currently only operates in push mode */
3643 static gboolean
3644 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3645     GstPadMode mode, gboolean active)
3646 {
3647   gboolean result;
3648   GstQueue2 *queue;
3649
3650   queue = GST_QUEUE2 (parent);
3651
3652   switch (mode) {
3653     case GST_PAD_MODE_PUSH:
3654       if (active) {
3655         GST_QUEUE2_MUTEX_LOCK (queue);
3656         GST_DEBUG_OBJECT (queue, "activating push mode");
3657         queue->srcresult = GST_FLOW_OK;
3658         queue->sinkresult = GST_FLOW_OK;
3659         queue->is_eos = FALSE;
3660         queue->unexpected = FALSE;
3661         reset_rate_timer (queue);
3662         GST_QUEUE2_MUTEX_UNLOCK (queue);
3663       } else {
3664         /* unblock chain function */
3665         GST_QUEUE2_MUTEX_LOCK (queue);
3666         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3667         queue->srcresult = GST_FLOW_FLUSHING;
3668         queue->sinkresult = GST_FLOW_FLUSHING;
3669         GST_QUEUE2_SIGNAL_DEL (queue);
3670         GST_QUEUE2_MUTEX_UNLOCK (queue);
3671
3672         /* wait until it is unblocked and clean up */
3673         GST_PAD_STREAM_LOCK (pad);
3674         GST_QUEUE2_MUTEX_LOCK (queue);
3675         gst_queue2_locked_flush (queue, TRUE, FALSE);
3676         GST_QUEUE2_MUTEX_UNLOCK (queue);
3677         GST_PAD_STREAM_UNLOCK (pad);
3678       }
3679       result = TRUE;
3680       break;
3681     default:
3682       result = FALSE;
3683       break;
3684   }
3685   return result;
3686 }
3687
3688 /* src operating in push mode, we start a task on the source pad that pushes out
3689  * buffers from the queue */
3690 static gboolean
3691 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3692 {
3693   gboolean result = FALSE;
3694   GstQueue2 *queue;
3695
3696   queue = GST_QUEUE2 (parent);
3697
3698   if (active) {
3699     GST_QUEUE2_MUTEX_LOCK (queue);
3700     GST_DEBUG_OBJECT (queue, "activating push mode");
3701     queue->srcresult = GST_FLOW_OK;
3702     queue->sinkresult = GST_FLOW_OK;
3703     queue->is_eos = FALSE;
3704     queue->unexpected = FALSE;
3705     result =
3706         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3707     GST_QUEUE2_MUTEX_UNLOCK (queue);
3708   } else {
3709     /* unblock loop function */
3710     GST_QUEUE2_MUTEX_LOCK (queue);
3711     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3712     queue->srcresult = GST_FLOW_FLUSHING;
3713     queue->sinkresult = GST_FLOW_FLUSHING;
3714     /* the item add signal will unblock */
3715     GST_QUEUE2_SIGNAL_ADD (queue);
3716     GST_QUEUE2_MUTEX_UNLOCK (queue);
3717
3718     /* step 2, make sure streaming finishes */
3719     result = gst_pad_stop_task (pad);
3720
3721     GST_QUEUE2_MUTEX_LOCK (queue);
3722     gst_queue2_locked_flush (queue, FALSE, FALSE);
3723     GST_QUEUE2_MUTEX_UNLOCK (queue);
3724   }
3725
3726   return result;
3727 }
3728
3729 /* pull mode, downstream will call our getrange function */
3730 static gboolean
3731 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3732 {
3733   gboolean result;
3734   GstQueue2 *queue;
3735
3736   queue = GST_QUEUE2 (parent);
3737
3738   if (active) {
3739     GST_QUEUE2_MUTEX_LOCK (queue);
3740     if (!QUEUE_IS_USING_QUEUE (queue)) {
3741       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3742         /* open the temp file now */
3743         result = gst_queue2_open_temp_location_file (queue);
3744       } else if (!queue->ring_buffer) {
3745         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3746         result = ! !queue->ring_buffer;
3747       } else {
3748         result = TRUE;
3749       }
3750
3751       GST_DEBUG_OBJECT (queue, "activating pull mode");
3752       init_ranges (queue);
3753       queue->srcresult = GST_FLOW_OK;
3754       queue->sinkresult = GST_FLOW_OK;
3755       queue->is_eos = FALSE;
3756       queue->unexpected = FALSE;
3757       queue->upstream_size = 0;
3758     } else {
3759       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3760       /* this is not allowed, we cannot operate in pull mode without a temp
3761        * file. */
3762       queue->srcresult = GST_FLOW_FLUSHING;
3763       queue->sinkresult = GST_FLOW_FLUSHING;
3764       result = FALSE;
3765     }
3766     GST_QUEUE2_MUTEX_UNLOCK (queue);
3767   } else {
3768     GST_QUEUE2_MUTEX_LOCK (queue);
3769     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3770     queue->srcresult = GST_FLOW_FLUSHING;
3771     queue->sinkresult = GST_FLOW_FLUSHING;
3772     /* this will unlock getrange */
3773     GST_QUEUE2_SIGNAL_ADD (queue);
3774     result = TRUE;
3775     GST_QUEUE2_MUTEX_UNLOCK (queue);
3776   }
3777
3778   return result;
3779 }
3780
3781 static gboolean
3782 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3783     gboolean active)
3784 {
3785   gboolean res;
3786
3787   switch (mode) {
3788     case GST_PAD_MODE_PULL:
3789       res = gst_queue2_src_activate_pull (pad, parent, active);
3790       break;
3791     case GST_PAD_MODE_PUSH:
3792       res = gst_queue2_src_activate_push (pad, parent, active);
3793       break;
3794     default:
3795       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3796       res = FALSE;
3797       break;
3798   }
3799   return res;
3800 }
3801
3802 static GstStateChangeReturn
3803 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3804 {
3805   GstQueue2 *queue;
3806   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3807
3808   queue = GST_QUEUE2 (element);
3809
3810   switch (transition) {
3811     case GST_STATE_CHANGE_NULL_TO_READY:
3812       break;
3813     case GST_STATE_CHANGE_READY_TO_PAUSED:
3814       GST_QUEUE2_MUTEX_LOCK (queue);
3815       if (!QUEUE_IS_USING_QUEUE (queue)) {
3816         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3817           if (!gst_queue2_open_temp_location_file (queue))
3818             ret = GST_STATE_CHANGE_FAILURE;
3819         } else {
3820           if (queue->ring_buffer) {
3821             g_free (queue->ring_buffer);
3822             queue->ring_buffer = NULL;
3823           }
3824           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3825             ret = GST_STATE_CHANGE_FAILURE;
3826         }
3827         init_ranges (queue);
3828       }
3829       queue->segment_event_received = FALSE;
3830       queue->starting_segment = NULL;
3831       gst_event_replace (&queue->stream_start_event, NULL);
3832       GST_QUEUE2_MUTEX_UNLOCK (queue);
3833       break;
3834     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3835 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
3836       GST_QUEUE2_MUTEX_LOCK (queue);
3837       queue->is_buffering = FALSE;
3838       GST_QUEUE2_MUTEX_UNLOCK (queue);
3839 #endif
3840       break;
3841     default:
3842       break;
3843   }
3844
3845   if (ret == GST_STATE_CHANGE_FAILURE)
3846     return ret;
3847
3848   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3849
3850   if (ret == GST_STATE_CHANGE_FAILURE)
3851     return ret;
3852
3853   switch (transition) {
3854     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3855       break;
3856     case GST_STATE_CHANGE_PAUSED_TO_READY:
3857       GST_QUEUE2_MUTEX_LOCK (queue);
3858       if (!QUEUE_IS_USING_QUEUE (queue)) {
3859         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3860           gst_queue2_close_temp_location_file (queue);
3861         } else if (queue->ring_buffer) {
3862           g_free (queue->ring_buffer);
3863           queue->ring_buffer = NULL;
3864         }
3865         clean_ranges (queue);
3866       }
3867       if (queue->starting_segment != NULL) {
3868         gst_event_unref (queue->starting_segment);
3869         queue->starting_segment = NULL;
3870       }
3871       gst_event_replace (&queue->stream_start_event, NULL);
3872       GST_QUEUE2_MUTEX_UNLOCK (queue);
3873       break;
3874     case GST_STATE_CHANGE_READY_TO_NULL:
3875       break;
3876     default:
3877       break;
3878   }
3879
3880   return ret;
3881 }
3882
3883 /* changing the capacity of the queue must wake up
3884  * the _chain function, it might have more room now
3885  * to store the buffer/event in the queue */
3886 #define QUEUE_CAPACITY_CHANGE(q) \
3887   GST_QUEUE2_SIGNAL_DEL (queue); \
3888   if (queue->use_buffering)      \
3889     update_buffering (queue);
3890
3891 /* Changing the minimum required fill level must
3892  * wake up the _loop function as it might now
3893  * be able to preceed.
3894  */
3895 #define QUEUE_THRESHOLD_CHANGE(q)\
3896   GST_QUEUE2_SIGNAL_ADD (queue);
3897
3898 static void
3899 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3900 {
3901   GstState state;
3902
3903   /* the element must be stopped in order to do this */
3904   GST_OBJECT_LOCK (queue);
3905   state = GST_STATE (queue);
3906   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3907     goto wrong_state;
3908   GST_OBJECT_UNLOCK (queue);
3909
3910   /* set new location */
3911   g_free (queue->temp_template);
3912   queue->temp_template = g_strdup (template);
3913
3914   return;
3915
3916 /* ERROR */
3917 wrong_state:
3918   {
3919     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3920     GST_OBJECT_UNLOCK (queue);
3921   }
3922 }
3923
3924 static void
3925 gst_queue2_set_property (GObject * object,
3926     guint prop_id, const GValue * value, GParamSpec * pspec)
3927 {
3928   GstQueue2 *queue = GST_QUEUE2 (object);
3929
3930   /* someone could change levels here, and since this
3931    * affects the get/put funcs, we need to lock for safety. */
3932   GST_QUEUE2_MUTEX_LOCK (queue);
3933
3934   switch (prop_id) {
3935     case PROP_MAX_SIZE_BYTES:
3936       queue->max_level.bytes = g_value_get_uint (value);
3937       QUEUE_CAPACITY_CHANGE (queue);
3938       break;
3939     case PROP_MAX_SIZE_BUFFERS:
3940       queue->max_level.buffers = g_value_get_uint (value);
3941       QUEUE_CAPACITY_CHANGE (queue);
3942       break;
3943     case PROP_MAX_SIZE_TIME:
3944       queue->max_level.time = g_value_get_uint64 (value);
3945       /* set rate_time to the same value. We use an extra field in the level
3946        * structure so that we can easily access and compare it */
3947       queue->max_level.rate_time = queue->max_level.time;
3948       QUEUE_CAPACITY_CHANGE (queue);
3949       break;
3950     case PROP_USE_BUFFERING:
3951       queue->use_buffering = g_value_get_boolean (value);
3952       if (!queue->use_buffering && queue->is_buffering) {
3953         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3954             "posting 100%% message");
3955         SET_PERCENT (queue, 100);
3956         queue->is_buffering = FALSE;
3957       }
3958
3959       if (queue->use_buffering) {
3960         queue->is_buffering = TRUE;
3961         update_buffering (queue);
3962       }
3963       break;
3964     case PROP_USE_TAGS_BITRATE:
3965       queue->use_tags_bitrate = g_value_get_boolean (value);
3966       break;
3967     case PROP_USE_RATE_ESTIMATE:
3968       queue->use_rate_estimate = g_value_get_boolean (value);
3969       break;
3970     case PROP_LOW_PERCENT:
3971       queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3972       if (queue->is_buffering)
3973         update_buffering (queue);
3974       break;
3975     case PROP_HIGH_PERCENT:
3976       queue->high_watermark =
3977           g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3978       if (queue->is_buffering)
3979         update_buffering (queue);
3980       break;
3981     case PROP_LOW_WATERMARK:
3982       queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3983       if (queue->is_buffering)
3984         update_buffering (queue);
3985       break;
3986     case PROP_HIGH_WATERMARK:
3987       queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3988       if (queue->is_buffering)
3989         update_buffering (queue);
3990       break;
3991     case PROP_TEMP_TEMPLATE:
3992       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3993       break;
3994     case PROP_TEMP_REMOVE:
3995       queue->temp_remove = g_value_get_boolean (value);
3996       break;
3997     case PROP_RING_BUFFER_MAX_SIZE:
3998       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3999       break;
4000 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
4001     case PROP_BUFFER_MODE:
4002       queue->mode = g_value_get_enum (value);
4003       break;
4004 #endif
4005     default:
4006       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4007       break;
4008   }
4009
4010   GST_QUEUE2_MUTEX_UNLOCK (queue);
4011   gst_queue2_post_buffering (queue);
4012 }
4013
4014 static void
4015 gst_queue2_get_property (GObject * object,
4016     guint prop_id, GValue * value, GParamSpec * pspec)
4017 {
4018   GstQueue2 *queue = GST_QUEUE2 (object);
4019
4020   GST_QUEUE2_MUTEX_LOCK (queue);
4021
4022   switch (prop_id) {
4023     case PROP_CUR_LEVEL_BYTES:
4024       g_value_set_uint (value, queue->cur_level.bytes);
4025       break;
4026     case PROP_CUR_LEVEL_BUFFERS:
4027       g_value_set_uint (value, queue->cur_level.buffers);
4028       break;
4029     case PROP_CUR_LEVEL_TIME:
4030       g_value_set_uint64 (value, queue->cur_level.time);
4031       break;
4032     case PROP_MAX_SIZE_BYTES:
4033       g_value_set_uint (value, queue->max_level.bytes);
4034       break;
4035     case PROP_MAX_SIZE_BUFFERS:
4036       g_value_set_uint (value, queue->max_level.buffers);
4037       break;
4038     case PROP_MAX_SIZE_TIME:
4039       g_value_set_uint64 (value, queue->max_level.time);
4040       break;
4041     case PROP_USE_BUFFERING:
4042       g_value_set_boolean (value, queue->use_buffering);
4043       break;
4044     case PROP_USE_TAGS_BITRATE:
4045       g_value_set_boolean (value, queue->use_tags_bitrate);
4046       break;
4047     case PROP_USE_RATE_ESTIMATE:
4048       g_value_set_boolean (value, queue->use_rate_estimate);
4049       break;
4050     case PROP_LOW_PERCENT:
4051       g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
4052       break;
4053     case PROP_HIGH_PERCENT:
4054       g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
4055       break;
4056     case PROP_LOW_WATERMARK:
4057       g_value_set_double (value, queue->low_watermark /
4058           (gdouble) MAX_BUFFERING_LEVEL);
4059       break;
4060     case PROP_HIGH_WATERMARK:
4061       g_value_set_double (value, queue->high_watermark /
4062           (gdouble) MAX_BUFFERING_LEVEL);
4063       break;
4064     case PROP_TEMP_TEMPLATE:
4065       g_value_set_string (value, queue->temp_template);
4066       break;
4067     case PROP_TEMP_LOCATION:
4068       g_value_set_string (value, queue->temp_location);
4069       break;
4070     case PROP_TEMP_REMOVE:
4071       g_value_set_boolean (value, queue->temp_remove);
4072       break;
4073     case PROP_RING_BUFFER_MAX_SIZE:
4074       g_value_set_uint64 (value, queue->ring_buffer_max_size);
4075       break;
4076     case PROP_AVG_IN_RATE:
4077     {
4078       gdouble in_rate = queue->byte_in_rate;
4079
4080       /* During the first RATE_INTERVAL, byte_in_rate will not have been
4081        * calculated, so calculate it here. */
4082       if (in_rate == 0.0 && queue->bytes_in
4083           && queue->last_update_in_rates_elapsed > 0.0)
4084         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
4085
4086       g_value_set_int64 (value, (gint64) in_rate);
4087       break;
4088     }
4089 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
4090     case PROP_BUFFER_MODE:
4091       g_value_set_enum (value, queue->mode);
4092       break;
4093 #endif
4094     default:
4095       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4096       break;
4097   }
4098
4099   GST_QUEUE2_MUTEX_UNLOCK (queue);
4100 }