coreelements: mark properties with MUTABLE_PLAYING
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * The temp-location property will be used to notify the application of the
50  * allocated filename.
51  */
52
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56
57 #include "gstqueue2.h"
58
59 #include <glib/gstdio.h>
60
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
63
64 #include <string.h>
65
66 #ifdef G_OS_WIN32
67 #include <io.h>                 /* lseek, open, close, read */
68 #undef lseek
69 #define lseek _lseeki64
70 #undef off_t
71 #define off_t guint64
72 #else
73 #include <unistd.h>
74 #endif
75
76 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
77     GST_PAD_SINK,
78     GST_PAD_ALWAYS,
79     GST_STATIC_CAPS_ANY);
80
81 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
82     GST_PAD_SRC,
83     GST_PAD_ALWAYS,
84     GST_STATIC_CAPS_ANY);
85
86 GST_DEBUG_CATEGORY_STATIC (queue_debug);
87 #define GST_CAT_DEFAULT (queue_debug)
88 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
89
90 enum
91 {
92   LAST_SIGNAL
93 };
94
95 /* other defines */
96 #define DEFAULT_BUFFER_SIZE 4096
97 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
98 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
99 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
100
101 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
102
103 /* default property values */
104 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
105 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
106 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
107 #define DEFAULT_USE_BUFFERING      FALSE
108 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
109 #define DEFAULT_LOW_PERCENT        10
110 #define DEFAULT_HIGH_PERCENT       99
111 #define DEFAULT_TEMP_REMOVE        TRUE
112 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
113
114 enum
115 {
116   PROP_0,
117   PROP_CUR_LEVEL_BUFFERS,
118   PROP_CUR_LEVEL_BYTES,
119   PROP_CUR_LEVEL_TIME,
120   PROP_MAX_SIZE_BUFFERS,
121   PROP_MAX_SIZE_BYTES,
122   PROP_MAX_SIZE_TIME,
123   PROP_USE_BUFFERING,
124   PROP_USE_RATE_ESTIMATE,
125   PROP_LOW_PERCENT,
126   PROP_HIGH_PERCENT,
127   PROP_TEMP_TEMPLATE,
128   PROP_TEMP_LOCATION,
129   PROP_TEMP_REMOVE,
130   PROP_RING_BUFFER_MAX_SIZE,
131   PROP_LAST
132 };
133
134 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
135   l.buffers = 0;                                        \
136   l.bytes = 0;                                          \
137   l.time = 0;                                           \
138   l.rate_time = 0;                                      \
139 } G_STMT_END
140
141 #define STATUS(queue, pad, msg) \
142   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
143                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
144                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
145                       " ns, %"G_GUINT64_FORMAT" items", \
146                       GST_DEBUG_PAD_NAME (pad), \
147                       queue->cur_level.buffers, \
148                       queue->max_level.buffers, \
149                       queue->cur_level.bytes, \
150                       queue->max_level.bytes, \
151                       queue->cur_level.time, \
152                       queue->max_level.time, \
153                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
154                         queue->current->writing_pos - queue->current->max_reading_pos : \
155                         queue->queue.length))
156
157 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
158   g_mutex_lock (&q->qlock);                                              \
159 } G_STMT_END
160
161 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
162   GST_QUEUE2_MUTEX_LOCK (q);                                            \
163   if (res != GST_FLOW_OK)                                               \
164     goto label;                                                         \
165 } G_STMT_END
166
167 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
168   g_mutex_unlock (&q->qlock);                                            \
169 } G_STMT_END
170
171 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
172   STATUS (queue, q->sinkpad, "wait for DEL");                           \
173   q->waiting_del = TRUE;                                                \
174   g_cond_wait (&q->item_del, &queue->qlock);                              \
175   q->waiting_del = FALSE;                                               \
176   if (res != GST_FLOW_OK) {                                             \
177     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
178     goto label;                                                         \
179   }                                                                     \
180   STATUS (queue, q->sinkpad, "received DEL");                           \
181 } G_STMT_END
182
183 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
184   STATUS (queue, q->srcpad, "wait for ADD");                            \
185   q->waiting_add = TRUE;                                                \
186   g_cond_wait (&q->item_add, &q->qlock);                                  \
187   q->waiting_add = FALSE;                                               \
188   if (res != GST_FLOW_OK) {                                             \
189     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
190     goto label;                                                         \
191   }                                                                     \
192   STATUS (queue, q->srcpad, "received ADD");                            \
193 } G_STMT_END
194
195 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
196   if (q->waiting_del) {                                                 \
197     STATUS (q, q->srcpad, "signal DEL");                                \
198     g_cond_signal (&q->item_del);                                        \
199   }                                                                     \
200 } G_STMT_END
201
202 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
203   if (q->waiting_add) {                                                 \
204     STATUS (q, q->sinkpad, "signal ADD");                               \
205     g_cond_signal (&q->item_add);                                        \
206   }                                                                     \
207 } G_STMT_END
208
209 #define _do_init \
210     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
211     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
212         "dataflow inside the queue element");
213 #define gst_queue2_parent_class parent_class
214 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
215
216 static void gst_queue2_finalize (GObject * object);
217
218 static void gst_queue2_set_property (GObject * object,
219     guint prop_id, const GValue * value, GParamSpec * pspec);
220 static void gst_queue2_get_property (GObject * object,
221     guint prop_id, GValue * value, GParamSpec * pspec);
222
223 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
224     GstBuffer * buffer);
225 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
226     GstBufferList * buffer_list);
227 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
228 static void gst_queue2_loop (GstPad * pad);
229
230 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
231     GstEvent * event);
232 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
233     GstQuery * query);
234
235 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
236     GstEvent * event);
237 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
238     GstQuery * query);
239 static gboolean gst_queue2_handle_query (GstElement * element,
240     GstQuery * query);
241
242 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
243     guint64 offset, guint length, GstBuffer ** buffer);
244
245 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
246     GstPadMode mode, gboolean active);
247 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
248     GstPadMode mode, gboolean active);
249 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
250     GstStateChange transition);
251
252 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
253 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
254
255 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
256 static void update_in_rates (GstQueue2 * queue);
257
258 typedef enum
259 {
260   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
261   GST_QUEUE2_ITEM_TYPE_BUFFER,
262   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
263   GST_QUEUE2_ITEM_TYPE_EVENT,
264   GST_QUEUE2_ITEM_TYPE_QUERY
265 } GstQueue2ItemType;
266
267 typedef struct
268 {
269   GstQueue2ItemType type;
270   GstMiniObject *item;
271 } GstQueue2Item;
272
273 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
274
275 static void
276 gst_queue2_class_init (GstQueue2Class * klass)
277 {
278   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
279   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
280
281   gobject_class->set_property = gst_queue2_set_property;
282   gobject_class->get_property = gst_queue2_get_property;
283
284   /* properties */
285   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
286       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
287           "Current amount of data in the queue (bytes)",
288           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
289   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
290       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
291           "Current number of buffers in the queue",
292           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
293   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
294       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
295           "Current amount of data in the queue (in ns)",
296           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
297
298   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
299       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
300           "Max. amount of data in the queue (bytes, 0=disable)",
301           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
302           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
303           G_PARAM_STATIC_STRINGS));
304   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
305       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
306           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
307           DEFAULT_MAX_SIZE_BUFFERS,
308           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
309           G_PARAM_STATIC_STRINGS));
310   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
311       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
312           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
313           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
314           G_PARAM_STATIC_STRINGS));
315
316   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
317       g_param_spec_boolean ("use-buffering", "Use buffering",
318           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
319           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
320           G_PARAM_STATIC_STRINGS));
321   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
322       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
323           "Estimate the bitrate of the stream to calculate time level",
324           DEFAULT_USE_RATE_ESTIMATE,
325           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
326   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
327       g_param_spec_int ("low-percent", "Low percent",
328           "Low threshold for buffering to start. Only used if use-buffering is True",
329           0, 100, DEFAULT_LOW_PERCENT,
330           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
331   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
332       g_param_spec_int ("high-percent", "High percent",
333           "High threshold for buffering to finish. Only used if use-buffering is True",
334           0, 100, DEFAULT_HIGH_PERCENT,
335           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
336
337   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
338       g_param_spec_string ("temp-template", "Temporary File Template",
339           "File template to store temporary files in, should contain directory "
340           "and XXXXXX. (NULL == disabled)",
341           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342
343   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
344       g_param_spec_string ("temp-location", "Temporary File Location",
345           "Location to store temporary files in (Only read this property, "
346           "use temp-template to configure the name template)",
347           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
348
349   /**
350    * GstQueue2:temp-remove
351    *
352    * When temp-template is set, remove the temporary file when going to READY.
353    */
354   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
355       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
356           "Remove the temp-location after use",
357           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
358
359   /**
360    * GstQueue2:ring-buffer-max-size
361    *
362    * The maximum size of the ring buffer in bytes. If set to 0, the ring
363    * buffer is disabled. Default 0.
364    */
365   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
366       g_param_spec_uint64 ("ring-buffer-max-size",
367           "Max. ring buffer size (bytes)",
368           "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
369           0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371
372   /* set several parent class virtual functions */
373   gobject_class->finalize = gst_queue2_finalize;
374
375   gst_element_class_add_pad_template (gstelement_class,
376       gst_static_pad_template_get (&srctemplate));
377   gst_element_class_add_pad_template (gstelement_class,
378       gst_static_pad_template_get (&sinktemplate));
379
380   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
381       "Generic",
382       "Simple data queue",
383       "Erik Walthinsen <omega@cse.ogi.edu>, "
384       "Wim Taymans <wim.taymans@gmail.com>");
385
386   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
387   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
388 }
389
390 static void
391 gst_queue2_init (GstQueue2 * queue)
392 {
393   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
394
395   gst_pad_set_chain_function (queue->sinkpad,
396       GST_DEBUG_FUNCPTR (gst_queue2_chain));
397   gst_pad_set_chain_list_function (queue->sinkpad,
398       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
399   gst_pad_set_activatemode_function (queue->sinkpad,
400       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
401   gst_pad_set_event_function (queue->sinkpad,
402       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
403   gst_pad_set_query_function (queue->sinkpad,
404       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
405   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
406   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
407
408   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
409
410   gst_pad_set_activatemode_function (queue->srcpad,
411       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
412   gst_pad_set_getrange_function (queue->srcpad,
413       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
414   gst_pad_set_event_function (queue->srcpad,
415       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
416   gst_pad_set_query_function (queue->srcpad,
417       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
418   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
419   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
420
421   /* levels */
422   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
423   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
424   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
425   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
426   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
427   queue->use_buffering = DEFAULT_USE_BUFFERING;
428   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
429   queue->low_percent = DEFAULT_LOW_PERCENT;
430   queue->high_percent = DEFAULT_HIGH_PERCENT;
431
432   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
433   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
434
435   queue->sinktime = GST_CLOCK_TIME_NONE;
436   queue->srctime = GST_CLOCK_TIME_NONE;
437   queue->sink_tainted = TRUE;
438   queue->src_tainted = TRUE;
439
440   queue->srcresult = GST_FLOW_FLUSHING;
441   queue->sinkresult = GST_FLOW_FLUSHING;
442   queue->is_eos = FALSE;
443   queue->in_timer = g_timer_new ();
444   queue->out_timer = g_timer_new ();
445
446   g_mutex_init (&queue->qlock);
447   queue->waiting_add = FALSE;
448   g_cond_init (&queue->item_add);
449   queue->waiting_del = FALSE;
450   g_cond_init (&queue->item_del);
451   g_queue_init (&queue->queue);
452
453   g_cond_init (&queue->query_handled);
454   queue->last_query = FALSE;
455
456   queue->buffering_percent = 100;
457
458   /* tempfile related */
459   queue->temp_template = NULL;
460   queue->temp_location = NULL;
461   queue->temp_remove = DEFAULT_TEMP_REMOVE;
462
463   queue->ring_buffer = NULL;
464   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
465
466   GST_DEBUG_OBJECT (queue,
467       "initialized queue's not_empty & not_full conditions");
468 }
469
470 /* called only once, as opposed to dispose */
471 static void
472 gst_queue2_finalize (GObject * object)
473 {
474   GstQueue2 *queue = GST_QUEUE2 (object);
475
476   GST_DEBUG_OBJECT (queue, "finalizing queue");
477
478   while (!g_queue_is_empty (&queue->queue)) {
479     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
480
481     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
482       gst_mini_object_unref (qitem->item);
483     g_slice_free (GstQueue2Item, qitem);
484   }
485
486   queue->last_query = FALSE;
487   g_queue_clear (&queue->queue);
488   g_mutex_clear (&queue->qlock);
489   g_cond_clear (&queue->item_add);
490   g_cond_clear (&queue->item_del);
491   g_cond_clear (&queue->query_handled);
492   g_timer_destroy (queue->in_timer);
493   g_timer_destroy (queue->out_timer);
494
495   /* temp_file path cleanup  */
496   g_free (queue->temp_template);
497   g_free (queue->temp_location);
498
499   G_OBJECT_CLASS (parent_class)->finalize (object);
500 }
501
502 static void
503 debug_ranges (GstQueue2 * queue)
504 {
505   GstQueue2Range *walk;
506
507   for (walk = queue->ranges; walk; walk = walk->next) {
508     GST_DEBUG_OBJECT (queue,
509         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
510         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
511         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
512         walk->rb_writing_pos, walk->reading_pos,
513         walk == queue->current ? "**y**" : "  n  ");
514   }
515 }
516
517 /* clear all the downloaded ranges */
518 static void
519 clean_ranges (GstQueue2 * queue)
520 {
521   GST_DEBUG_OBJECT (queue, "clean queue ranges");
522
523   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
524   queue->ranges = NULL;
525   queue->current = NULL;
526 }
527
528 /* find a range that contains @offset or NULL when nothing does */
529 static GstQueue2Range *
530 find_range (GstQueue2 * queue, guint64 offset)
531 {
532   GstQueue2Range *range = NULL;
533   GstQueue2Range *walk;
534
535   /* first do a quick check for the current range */
536   for (walk = queue->ranges; walk; walk = walk->next) {
537     if (offset >= walk->offset && offset <= walk->writing_pos) {
538       /* we can reuse an existing range */
539       range = walk;
540       break;
541     }
542   }
543   if (range) {
544     GST_DEBUG_OBJECT (queue,
545         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
546         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
547   } else {
548     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
549   }
550   return range;
551 }
552
553 static void
554 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
555 {
556   guint64 max_reading_pos, writing_pos;
557
558   writing_pos = range->writing_pos;
559   max_reading_pos = range->max_reading_pos;
560
561   if (writing_pos > max_reading_pos)
562     queue->cur_level.bytes = writing_pos - max_reading_pos;
563   else
564     queue->cur_level.bytes = 0;
565 }
566
567 /* make a new range for @offset or reuse an existing range */
568 static GstQueue2Range *
569 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
570 {
571   GstQueue2Range *range, *prev, *next;
572
573   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
574
575   if ((range = find_range (queue, offset))) {
576     GST_DEBUG_OBJECT (queue,
577         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
578         range->writing_pos);
579     if (update_existing && range->writing_pos != offset) {
580       GST_DEBUG_OBJECT (queue, "updating range writing position to "
581           "%" G_GUINT64_FORMAT, offset);
582       range->writing_pos = offset;
583     }
584   } else {
585     GST_DEBUG_OBJECT (queue,
586         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
587
588     range = g_slice_new0 (GstQueue2Range);
589     range->offset = offset;
590     /* we want to write to the next location in the ring buffer */
591     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
592     range->writing_pos = offset;
593     range->rb_writing_pos = range->rb_offset;
594     range->reading_pos = offset;
595     range->max_reading_pos = offset;
596
597     /* insert sorted */
598     prev = NULL;
599     next = queue->ranges;
600     while (next) {
601       if (next->offset > offset) {
602         /* insert before next */
603         GST_DEBUG_OBJECT (queue,
604             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
605             next->offset);
606         break;
607       }
608       /* try next */
609       prev = next;
610       next = next->next;
611     }
612     range->next = next;
613     if (prev)
614       prev->next = range;
615     else
616       queue->ranges = range;
617   }
618   debug_ranges (queue);
619
620   /* update the stats for this range */
621   update_cur_level (queue, range);
622
623   return range;
624 }
625
626
627 /* clear and init the download ranges for offset 0 */
628 static void
629 init_ranges (GstQueue2 * queue)
630 {
631   GST_DEBUG_OBJECT (queue, "init queue ranges");
632
633   /* get rid of all the current ranges */
634   clean_ranges (queue);
635   /* make a range for offset 0 */
636   queue->current = add_range (queue, 0, TRUE);
637 }
638
639 /* calculate the diff between running time on the sink and src of the queue.
640  * This is the total amount of time in the queue. */
641 static void
642 update_time_level (GstQueue2 * queue)
643 {
644   if (queue->sink_tainted) {
645     queue->sinktime =
646         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
647         queue->sink_segment.position);
648     queue->sink_tainted = FALSE;
649   }
650
651   if (queue->src_tainted) {
652     queue->srctime =
653         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
654         queue->src_segment.position);
655     queue->src_tainted = FALSE;
656   }
657
658   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
659       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
660
661   if (queue->sinktime != GST_CLOCK_TIME_NONE
662       && queue->srctime != GST_CLOCK_TIME_NONE
663       && queue->sinktime >= queue->srctime)
664     queue->cur_level.time = queue->sinktime - queue->srctime;
665   else
666     queue->cur_level.time = 0;
667 }
668
669 /* take a SEGMENT event and apply the values to segment, updating the time
670  * level of queue. */
671 static void
672 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
673     gboolean is_sink)
674 {
675   gst_event_copy_segment (event, segment);
676
677   if (segment->format == GST_FORMAT_BYTES) {
678     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
679       /* start is where we'll be getting from and as such writing next */
680       queue->current = add_range (queue, segment->start, TRUE);
681     }
682   }
683
684   /* now configure the values, we use these to track timestamps on the
685    * sinkpad. */
686   if (segment->format != GST_FORMAT_TIME) {
687     /* non-time format, pretent the current time segment is closed with a
688      * 0 start and unknown stop time. */
689     segment->format = GST_FORMAT_TIME;
690     segment->start = 0;
691     segment->stop = -1;
692     segment->time = 0;
693   }
694
695   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
696
697   if (is_sink)
698     queue->sink_tainted = TRUE;
699   else
700     queue->src_tainted = TRUE;
701
702   /* segment can update the time level of the queue */
703   update_time_level (queue);
704 }
705
706 /* take a buffer and update segment, updating the time level of the queue. */
707 static void
708 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
709     gboolean is_sink)
710 {
711   GstClockTime duration, timestamp;
712
713   timestamp = GST_BUFFER_TIMESTAMP (buffer);
714   duration = GST_BUFFER_DURATION (buffer);
715
716   /* if no timestamp is set, assume it's continuous with the previous
717    * time */
718   if (timestamp == GST_CLOCK_TIME_NONE)
719     timestamp = segment->position;
720
721   /* add duration */
722   if (duration != GST_CLOCK_TIME_NONE)
723     timestamp += duration;
724
725   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
726       GST_TIME_ARGS (timestamp));
727
728   segment->position = timestamp;
729
730   if (is_sink)
731     queue->sink_tainted = TRUE;
732   else
733     queue->src_tainted = TRUE;
734
735   /* calc diff with other end */
736   update_time_level (queue);
737 }
738
739 static gboolean
740 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
741 {
742   GstClockTime *timestamp = data;
743
744   GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT
745       " duration %" GST_TIME_FORMAT, idx,
746       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
747       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
748
749   if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
750     *timestamp = GST_BUFFER_TIMESTAMP (*buf);
751
752   if (GST_BUFFER_DURATION_IS_VALID (*buf))
753     *timestamp += GST_BUFFER_DURATION (*buf);
754
755   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
756   return TRUE;
757 }
758
759 /* take a buffer list and update segment, updating the time level of the queue */
760 static void
761 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
762     GstSegment * segment, gboolean is_sink)
763 {
764   GstClockTime timestamp;
765
766   /* if no timestamp is set, assume it's continuous with the previous time */
767   timestamp = segment->position;
768
769   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
770
771   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
772       GST_TIME_ARGS (timestamp));
773
774   segment->position = timestamp;
775
776   if (is_sink)
777     queue->sink_tainted = TRUE;
778   else
779     queue->src_tainted = TRUE;
780
781   /* calc diff with other end */
782   update_time_level (queue);
783 }
784
785 static gboolean
786 get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
787     gint * percent)
788 {
789   gint perc;
790
791   if (queue->high_percent <= 0) {
792     if (percent)
793       *percent = 100;
794     if (is_buffering)
795       *is_buffering = FALSE;
796     return FALSE;
797   }
798 #define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
799
800   if (queue->is_eos) {
801     /* on EOS we are always 100% full, we set the var here so that it we can
802      * reuse the logic below to stop buffering */
803     perc = 100;
804     GST_LOG_OBJECT (queue, "we are EOS");
805   } else {
806     /* figure out the percent we are filled, we take the max of all formats. */
807     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
808       perc = GET_PERCENT (bytes, 0);
809     } else {
810       guint64 rb_size = queue->ring_buffer_max_size;
811       perc = GET_PERCENT (bytes, rb_size);
812     }
813     perc = MAX (perc, GET_PERCENT (time, 0));
814     perc = MAX (perc, GET_PERCENT (buffers, 0));
815
816     /* also apply the rate estimate when we need to */
817     if (queue->use_rate_estimate)
818       perc = MAX (perc, GET_PERCENT (rate_time, 0));
819   }
820 #undef GET_PERCENT
821
822   if (is_buffering)
823     *is_buffering = queue->is_buffering;
824
825   /* scale to high percent so that it becomes the 100% mark */
826   perc = perc * 100 / queue->high_percent;
827   /* clip */
828   if (perc > 100)
829     perc = 100;
830
831   if (percent)
832     *percent = perc;
833
834   GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
835       perc);
836
837   return TRUE;
838 }
839
840 static void
841 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
842     gint * avg_in, gint * avg_out, gint64 * buffering_left)
843 {
844   if (mode) {
845     if (!QUEUE_IS_USING_QUEUE (queue)) {
846       if (QUEUE_IS_USING_RING_BUFFER (queue))
847         *mode = GST_BUFFERING_TIMESHIFT;
848       else
849         *mode = GST_BUFFERING_DOWNLOAD;
850     } else {
851       *mode = GST_BUFFERING_STREAM;
852     }
853   }
854
855   if (avg_in)
856     *avg_in = queue->byte_in_rate;
857   if (avg_out)
858     *avg_out = queue->byte_out_rate;
859
860   if (buffering_left) {
861     *buffering_left = (percent == 100 ? 0 : -1);
862
863     if (queue->use_rate_estimate) {
864       guint64 max, cur;
865
866       max = queue->max_level.rate_time;
867       cur = queue->cur_level.rate_time;
868
869       if (percent != 100 && max > cur)
870         *buffering_left = (max - cur) / 1000000;
871     }
872   }
873 }
874
875 static void
876 update_buffering (GstQueue2 * queue)
877 {
878   gint percent;
879   gboolean post = FALSE;
880
881   /* Ensure the variables used to calculate buffering state are up-to-date. */
882   if (queue->current)
883     update_cur_level (queue, queue->current);
884   update_in_rates (queue);
885
886   if (!get_buffering_percent (queue, NULL, &percent))
887     return;
888
889   if (queue->is_buffering) {
890     post = TRUE;
891     /* if we were buffering see if we reached the high watermark */
892     if (percent >= queue->high_percent)
893       queue->is_buffering = FALSE;
894   } else {
895     /* we were not buffering, check if we need to start buffering if we drop
896      * below the low threshold */
897     if (percent < queue->low_percent) {
898       queue->is_buffering = TRUE;
899       post = TRUE;
900     }
901   }
902
903   if (post) {
904     if (percent == queue->buffering_percent)
905       post = FALSE;
906     else
907       queue->buffering_percent = percent;
908   }
909
910   if (post) {
911     GstMessage *message;
912     GstBufferingMode mode;
913     gint avg_in, avg_out;
914     gint64 buffering_left;
915
916     get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
917         &buffering_left);
918
919     message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
920         (gint) percent);
921     gst_message_set_buffering_stats (message, mode,
922         avg_in, avg_out, buffering_left);
923
924     gst_element_post_message (GST_ELEMENT_CAST (queue), message);
925   }
926 }
927
928 static void
929 reset_rate_timer (GstQueue2 * queue)
930 {
931   queue->bytes_in = 0;
932   queue->bytes_out = 0;
933   queue->byte_in_rate = 0.0;
934   queue->byte_in_period = 0;
935   queue->byte_out_rate = 0.0;
936   queue->last_in_elapsed = 0.0;
937   queue->last_out_elapsed = 0.0;
938   queue->in_timer_started = FALSE;
939   queue->out_timer_started = FALSE;
940 }
941
942 /* the interval in seconds to recalculate the rate */
943 #define RATE_INTERVAL    0.2
944 /* Tuning for rate estimation. We use a large window for the input rate because
945  * it should be stable when connected to a network. The output rate is less
946  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
947  * therefore adapt more quickly.
948  * However, initial input rate may be subject to a burst, and should therefore
949  * initially also adapt more quickly to changes, and only later on give higher
950  * weight to previous values. */
951 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
952 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
953
954 static void
955 update_in_rates (GstQueue2 * queue)
956 {
957   gdouble elapsed, period;
958   gdouble byte_in_rate;
959
960   if (!queue->in_timer_started) {
961     queue->in_timer_started = TRUE;
962     g_timer_start (queue->in_timer);
963     return;
964   }
965
966   elapsed = g_timer_elapsed (queue->in_timer, NULL);
967
968   /* recalc after each interval. */
969   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
970     period = elapsed - queue->last_in_elapsed;
971
972     GST_DEBUG_OBJECT (queue,
973         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
974         period, queue->bytes_in, queue->byte_in_period);
975
976     byte_in_rate = queue->bytes_in / period;
977
978     if (queue->byte_in_rate == 0.0)
979       queue->byte_in_rate = byte_in_rate;
980     else
981       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
982           (double) queue->byte_in_period, period);
983
984     /* another data point, cap at 16 for long time running average */
985     if (queue->byte_in_period < 16 * RATE_INTERVAL)
986       queue->byte_in_period += period;
987
988     /* reset the values to calculate rate over the next interval */
989     queue->last_in_elapsed = elapsed;
990     queue->bytes_in = 0;
991   }
992
993   if (queue->byte_in_rate > 0.0) {
994     queue->cur_level.rate_time =
995         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
996   }
997   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
998       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
999 }
1000
1001 static void
1002 update_out_rates (GstQueue2 * queue)
1003 {
1004   gdouble elapsed, period;
1005   gdouble byte_out_rate;
1006
1007   if (!queue->out_timer_started) {
1008     queue->out_timer_started = TRUE;
1009     g_timer_start (queue->out_timer);
1010     return;
1011   }
1012
1013   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1014
1015   /* recalc after each interval. */
1016   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1017     period = elapsed - queue->last_out_elapsed;
1018
1019     GST_DEBUG_OBJECT (queue,
1020         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1021
1022     byte_out_rate = queue->bytes_out / period;
1023
1024     if (queue->byte_out_rate == 0.0)
1025       queue->byte_out_rate = byte_out_rate;
1026     else
1027       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1028
1029     /* reset the values to calculate rate over the next interval */
1030     queue->last_out_elapsed = elapsed;
1031     queue->bytes_out = 0;
1032   }
1033   if (queue->byte_in_rate > 0.0) {
1034     queue->cur_level.rate_time =
1035         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1036   }
1037   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1038       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1039 }
1040
1041 static void
1042 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1043 {
1044   guint64 reading_pos, max_reading_pos;
1045
1046   reading_pos = pos;
1047   max_reading_pos = range->max_reading_pos;
1048
1049   max_reading_pos = MAX (max_reading_pos, reading_pos);
1050
1051   GST_DEBUG_OBJECT (queue,
1052       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1053       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1054   range->max_reading_pos = max_reading_pos;
1055
1056   update_cur_level (queue, range);
1057 }
1058
1059 static gboolean
1060 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1061 {
1062   GstEvent *event;
1063   gboolean res;
1064
1065   /* until we receive the FLUSH_STOP from this seek, we skip data */
1066   queue->seeking = TRUE;
1067   GST_QUEUE2_MUTEX_UNLOCK (queue);
1068
1069   debug_ranges (queue);
1070
1071   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1072
1073   event =
1074       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1075       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1076       GST_SEEK_TYPE_NONE, -1);
1077
1078   res = gst_pad_push_event (queue->sinkpad, event);
1079   GST_QUEUE2_MUTEX_LOCK (queue);
1080
1081   if (res) {
1082     /* Between us sending the seek event and re-acquiring the lock, the source
1083      * thread might already have pushed data and moved along the range's
1084      * writing_pos beyond the seek offset. In that case we don't want to set
1085      * the writing position back to the requested seek position, as it would
1086      * cause data to be written to the wrong offset in the file or ring buffer.
1087      * We still do the add_range call to switch the current range to the
1088      * requested range, or create one if one doesn't exist yet. */
1089     queue->current = add_range (queue, offset, FALSE);
1090   }
1091
1092   return res;
1093 }
1094
1095 /* get the threshold for when we decide to seek rather than wait */
1096 static guint64
1097 get_seek_threshold (GstQueue2 * queue)
1098 {
1099   guint64 threshold;
1100
1101   /* FIXME, find a good threshold based on the incoming rate. */
1102   threshold = 1024 * 512;
1103
1104   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1105     threshold = MIN (threshold,
1106         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1107   }
1108   return threshold;
1109 }
1110
1111 /* see if there is enough data in the file to read a full buffer */
1112 static gboolean
1113 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1114 {
1115   GstQueue2Range *range;
1116
1117   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1118       offset, length);
1119
1120   if ((range = find_range (queue, offset))) {
1121     if (queue->current != range) {
1122       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1123       perform_seek_to_offset (queue, range->writing_pos);
1124     }
1125
1126     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1127         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1128
1129     /* we have a range for offset */
1130     GST_DEBUG_OBJECT (queue,
1131         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1132         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1133
1134     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1135       return TRUE;
1136
1137     if (offset + length <= range->writing_pos)
1138       return TRUE;
1139     else
1140       GST_DEBUG_OBJECT (queue,
1141           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1142           (offset + length) - range->writing_pos);
1143
1144   } else {
1145     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1146         " len %u", offset, length);
1147     /* we don't have the range, see how far away we are */
1148     if (!queue->is_eos && queue->current) {
1149       guint64 threshold = get_seek_threshold (queue);
1150
1151       if (offset >= queue->current->offset && offset <=
1152           queue->current->writing_pos + threshold) {
1153         GST_INFO_OBJECT (queue,
1154             "requested data is within range, wait for data");
1155         return FALSE;
1156       }
1157     }
1158
1159     /* too far away, do a seek */
1160     perform_seek_to_offset (queue, offset);
1161   }
1162
1163   return FALSE;
1164 }
1165
1166 #ifdef HAVE_FSEEKO
1167 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1168 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1169 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1170 #else
1171 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1172 #endif
1173
1174 static GstFlowReturn
1175 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1176     guint8 * dst, gint64 * read_return)
1177 {
1178   guint8 *ring_buffer;
1179   size_t res;
1180
1181   ring_buffer = queue->ring_buffer;
1182
1183   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1184     goto seek_failed;
1185
1186   /* this should not block */
1187   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1188       length, offset);
1189   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1190     res = fread (dst, 1, length, queue->temp_file);
1191   } else {
1192     memcpy (dst, ring_buffer + offset, length);
1193     res = length;
1194   }
1195
1196   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1197
1198   if (G_UNLIKELY (res < length)) {
1199     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1200       goto could_not_read;
1201     /* check for errors or EOF */
1202     if (ferror (queue->temp_file))
1203       goto could_not_read;
1204     if (feof (queue->temp_file) && length > 0)
1205       goto eos;
1206   }
1207
1208   *read_return = res;
1209
1210   return GST_FLOW_OK;
1211
1212 seek_failed:
1213   {
1214     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1215     return GST_FLOW_ERROR;
1216   }
1217 could_not_read:
1218   {
1219     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1220     return GST_FLOW_ERROR;
1221   }
1222 eos:
1223   {
1224     GST_DEBUG ("non-regular file hits EOS");
1225     return GST_FLOW_EOS;
1226   }
1227 }
1228
1229 static GstFlowReturn
1230 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1231     GstBuffer ** buffer)
1232 {
1233   GstBuffer *buf;
1234   GstMapInfo info;
1235   guint8 *data;
1236   guint64 file_offset;
1237   guint block_length, remaining, read_length;
1238   guint64 rb_size;
1239   guint64 max_size;
1240   guint64 rpos;
1241   GstFlowReturn ret = GST_FLOW_OK;
1242
1243   /* allocate the output buffer of the requested size */
1244   if (*buffer == NULL)
1245     buf = gst_buffer_new_allocate (NULL, length, NULL);
1246   else
1247     buf = *buffer;
1248
1249   gst_buffer_map (buf, &info, GST_MAP_WRITE);
1250   data = info.data;
1251
1252   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1253       offset);
1254
1255   rpos = offset;
1256   rb_size = queue->ring_buffer_max_size;
1257   max_size = QUEUE_MAX_BYTES (queue);
1258
1259   remaining = length;
1260   while (remaining > 0) {
1261     /* configure how much/whether to read */
1262     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1263       read_length = 0;
1264
1265       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1266         guint64 level;
1267
1268         /* calculate how far away the offset is */
1269         if (queue->current->writing_pos > rpos)
1270           level = queue->current->writing_pos - rpos;
1271         else
1272           level = 0;
1273
1274         GST_DEBUG_OBJECT (queue,
1275             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1276             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1277             rpos, queue->current->writing_pos, level, max_size);
1278
1279         if (level >= max_size) {
1280           /* we don't have the data but if we have a ring buffer that is full, we
1281            * need to read */
1282           GST_DEBUG_OBJECT (queue,
1283               "ring buffer full, reading QUEUE_MAX_BYTES %"
1284               G_GUINT64_FORMAT " bytes", max_size);
1285           read_length = max_size;
1286         } else if (queue->is_eos) {
1287           /* won't get any more data so read any data we have */
1288           if (level) {
1289             GST_DEBUG_OBJECT (queue,
1290                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1291                 level);
1292             read_length = level;
1293             remaining = level;
1294             length = level;
1295           } else
1296             goto hit_eos;
1297         }
1298       }
1299
1300       if (read_length == 0) {
1301         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1302           GST_DEBUG_OBJECT (queue,
1303               "update current position [%" G_GUINT64_FORMAT "-%"
1304               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1305           update_cur_pos (queue, queue->current, rpos);
1306           GST_QUEUE2_SIGNAL_DEL (queue);
1307         }
1308
1309         if (queue->use_buffering)
1310           update_buffering (queue);
1311
1312         GST_DEBUG_OBJECT (queue, "waiting for add");
1313         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1314         continue;
1315       }
1316     } else {
1317       /* we have the requested data so read it */
1318       read_length = remaining;
1319     }
1320
1321     /* set range reading_pos to actual reading position for this read */
1322     queue->current->reading_pos = rpos;
1323
1324     /* configure how much and from where to read */
1325     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1326       file_offset =
1327           (queue->current->rb_offset + (rpos -
1328               queue->current->offset)) % rb_size;
1329       if (file_offset + read_length > rb_size) {
1330         block_length = rb_size - file_offset;
1331       } else {
1332         block_length = read_length;
1333       }
1334     } else {
1335       file_offset = rpos;
1336       block_length = read_length;
1337     }
1338
1339     /* while we still have data to read, we loop */
1340     while (read_length > 0) {
1341       gint64 read_return;
1342
1343       ret =
1344           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1345           data, &read_return);
1346       if (ret != GST_FLOW_OK)
1347         goto read_error;
1348
1349       file_offset += read_return;
1350       if (QUEUE_IS_USING_RING_BUFFER (queue))
1351         file_offset %= rb_size;
1352
1353       data += read_return;
1354       read_length -= read_return;
1355       block_length = read_length;
1356       remaining -= read_return;
1357
1358       rpos = (queue->current->reading_pos += read_return);
1359       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1360     }
1361     GST_QUEUE2_SIGNAL_DEL (queue);
1362     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1363   }
1364
1365   gst_buffer_unmap (buf, &info);
1366   gst_buffer_resize (buf, 0, length);
1367
1368   GST_BUFFER_OFFSET (buf) = offset;
1369   GST_BUFFER_OFFSET_END (buf) = offset + length;
1370
1371   *buffer = buf;
1372
1373   return ret;
1374
1375   /* ERRORS */
1376 hit_eos:
1377   {
1378     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1379     gst_buffer_unmap (buf, &info);
1380     if (*buffer == NULL)
1381       gst_buffer_unref (buf);
1382     return GST_FLOW_EOS;
1383   }
1384 out_flushing:
1385   {
1386     GST_DEBUG_OBJECT (queue, "we are flushing");
1387     gst_buffer_unmap (buf, &info);
1388     if (*buffer == NULL)
1389       gst_buffer_unref (buf);
1390     return GST_FLOW_FLUSHING;
1391   }
1392 read_error:
1393   {
1394     GST_DEBUG_OBJECT (queue, "we have a read error");
1395     gst_buffer_unmap (buf, &info);
1396     if (*buffer == NULL)
1397       gst_buffer_unref (buf);
1398     return ret;
1399   }
1400 }
1401
1402 /* should be called with QUEUE_LOCK */
1403 static GstMiniObject *
1404 gst_queue2_read_item_from_file (GstQueue2 * queue)
1405 {
1406   GstMiniObject *item;
1407
1408   if (queue->stream_start_event != NULL) {
1409     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1410     queue->stream_start_event = NULL;
1411   } else if (queue->starting_segment != NULL) {
1412     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1413     queue->starting_segment = NULL;
1414   } else {
1415     GstFlowReturn ret;
1416     GstBuffer *buffer = NULL;
1417     guint64 reading_pos;
1418
1419     reading_pos = queue->current->reading_pos;
1420
1421     ret =
1422         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1423         &buffer);
1424
1425     switch (ret) {
1426       case GST_FLOW_OK:
1427         item = GST_MINI_OBJECT_CAST (buffer);
1428         break;
1429       case GST_FLOW_EOS:
1430         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1431         break;
1432       default:
1433         item = NULL;
1434         break;
1435     }
1436   }
1437   return item;
1438 }
1439
1440 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1441  * the temp filename. */
1442 static gboolean
1443 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1444 {
1445   gint fd = -1;
1446   gchar *name = NULL;
1447
1448   if (queue->temp_file)
1449     goto already_opened;
1450
1451   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1452
1453   /* If temp_template was set, allocate a filename and open that filen */
1454
1455   /* nothing to do */
1456   if (queue->temp_template == NULL)
1457     goto no_directory;
1458
1459   /* make copy of the template, we don't want to change this */
1460   name = g_strdup (queue->temp_template);
1461   fd = g_mkstemp (name);
1462   if (fd == -1)
1463     goto mkstemp_failed;
1464
1465   /* open the file for update/writing */
1466   queue->temp_file = fdopen (fd, "wb+");
1467   /* error creating file */
1468   if (queue->temp_file == NULL)
1469     goto open_failed;
1470
1471   g_free (queue->temp_location);
1472   queue->temp_location = name;
1473
1474   GST_QUEUE2_MUTEX_UNLOCK (queue);
1475
1476   /* we can't emit the notify with the lock */
1477   g_object_notify (G_OBJECT (queue), "temp-location");
1478
1479   GST_QUEUE2_MUTEX_LOCK (queue);
1480
1481   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1482
1483   return TRUE;
1484
1485   /* ERRORS */
1486 already_opened:
1487   {
1488     GST_DEBUG_OBJECT (queue, "temp file was already open");
1489     return TRUE;
1490   }
1491 no_directory:
1492   {
1493     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1494         (_("No Temp directory specified.")), (NULL));
1495     return FALSE;
1496   }
1497 mkstemp_failed:
1498   {
1499     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1500         (_("Could not create temp file \"%s\"."), queue->temp_template),
1501         GST_ERROR_SYSTEM);
1502     g_free (name);
1503     return FALSE;
1504   }
1505 open_failed:
1506   {
1507     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1508         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1509     g_free (name);
1510     if (fd != -1)
1511       close (fd);
1512     return FALSE;
1513   }
1514 }
1515
1516 static void
1517 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1518 {
1519   /* nothing to do */
1520   if (queue->temp_file == NULL)
1521     return;
1522
1523   GST_DEBUG_OBJECT (queue, "closing temp file");
1524
1525   fflush (queue->temp_file);
1526   fclose (queue->temp_file);
1527
1528   if (queue->temp_remove) {
1529     if (remove (queue->temp_location) < 0) {
1530       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1531           queue->temp_location, g_strerror (errno));
1532     }
1533   }
1534
1535   queue->temp_file = NULL;
1536   clean_ranges (queue);
1537 }
1538
1539 static void
1540 gst_queue2_flush_temp_file (GstQueue2 * queue)
1541 {
1542   if (queue->temp_file == NULL)
1543     return;
1544
1545   GST_DEBUG_OBJECT (queue, "flushing temp file");
1546
1547   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1548 }
1549
1550 static void
1551 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1552 {
1553   if (!QUEUE_IS_USING_QUEUE (queue)) {
1554     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1555       gst_queue2_flush_temp_file (queue);
1556     init_ranges (queue);
1557   } else {
1558     while (!g_queue_is_empty (&queue->queue)) {
1559       GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1560
1561       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1562           && GST_EVENT_IS_STICKY (qitem->item)
1563           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1564           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1565         gst_pad_store_sticky_event (queue->srcpad,
1566             GST_EVENT_CAST (qitem->item));
1567       }
1568
1569       /* Then lose another reference because we are supposed to destroy that
1570          data when flushing */
1571       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1572         gst_mini_object_unref (qitem->item);
1573       g_slice_free (GstQueue2Item, qitem);
1574     }
1575   }
1576   queue->last_query = FALSE;
1577   g_cond_signal (&queue->query_handled);
1578   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1579   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1580   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1581   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1582   queue->sink_tainted = queue->src_tainted = TRUE;
1583   if (queue->starting_segment != NULL)
1584     gst_event_unref (queue->starting_segment);
1585   queue->starting_segment = NULL;
1586   queue->segment_event_received = FALSE;
1587   gst_event_replace (&queue->stream_start_event, NULL);
1588
1589   /* we deleted a lot of something */
1590   GST_QUEUE2_SIGNAL_DEL (queue);
1591 }
1592
1593 static gboolean
1594 gst_queue2_wait_free_space (GstQueue2 * queue)
1595 {
1596   /* We make space available if we're "full" according to whatever
1597    * the user defined as "full". */
1598   if (gst_queue2_is_filled (queue)) {
1599     gboolean started;
1600
1601     /* pause the timer while we wait. The fact that we are waiting does not mean
1602      * the byterate on the input pad is lower */
1603     if ((started = queue->in_timer_started))
1604       g_timer_stop (queue->in_timer);
1605
1606     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1607         "queue is full, waiting for free space");
1608     do {
1609       /* Wait for space to be available, we could be unlocked because of a flush. */
1610       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1611     }
1612     while (gst_queue2_is_filled (queue));
1613
1614     /* and continue if we were running before */
1615     if (started)
1616       g_timer_continue (queue->in_timer);
1617   }
1618   return TRUE;
1619
1620   /* ERRORS */
1621 out_flushing:
1622   {
1623     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1624     return FALSE;
1625   }
1626 }
1627
1628 static gboolean
1629 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1630 {
1631   GstMapInfo info;
1632   guint8 *data, *ring_buffer;
1633   guint size, rb_size;
1634   guint64 writing_pos, new_writing_pos;
1635   GstQueue2Range *range, *prev, *next;
1636   gboolean do_seek = FALSE;
1637
1638   if (QUEUE_IS_USING_RING_BUFFER (queue))
1639     writing_pos = queue->current->rb_writing_pos;
1640   else
1641     writing_pos = queue->current->writing_pos;
1642   ring_buffer = queue->ring_buffer;
1643   rb_size = queue->ring_buffer_max_size;
1644
1645   gst_buffer_map (buffer, &info, GST_MAP_READ);
1646
1647   size = info.size;
1648   data = info.data;
1649
1650   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1651       writing_pos);
1652
1653   /* sanity check */
1654   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1655       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1656     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1657         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1658         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1659   }
1660
1661   while (size > 0) {
1662     guint to_write;
1663
1664     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1665       gint64 space;
1666
1667       /* calculate the space in the ring buffer not used by data from
1668        * the current range */
1669       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1670         /* wait until there is some free space */
1671         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1672       }
1673       /* get the amount of space we have */
1674       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1675
1676       /* calculate if we need to split or if we can write the entire
1677        * buffer now */
1678       to_write = MIN (size, space);
1679
1680       /* the writing position in the ring buffer after writing (part
1681        * or all of) the buffer */
1682       new_writing_pos = (writing_pos + to_write) % rb_size;
1683
1684       prev = NULL;
1685       range = queue->ranges;
1686
1687       /* if we need to overwrite data in the ring buffer, we need to
1688        * update the ranges
1689        *
1690        * warning: this code is complicated and includes some
1691        * simplifications - pen, paper and diagrams for the cases
1692        * recommended! */
1693       while (range) {
1694         guint64 range_data_start, range_data_end;
1695         GstQueue2Range *range_to_destroy = NULL;
1696
1697         range_data_start = range->rb_offset;
1698         range_data_end = range->rb_writing_pos;
1699
1700         /* handle the special case where the range has no data in it */
1701         if (range->writing_pos == range->offset) {
1702           if (range != queue->current) {
1703             GST_DEBUG_OBJECT (queue,
1704                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1705                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1706             /* remove range */
1707             range_to_destroy = range;
1708             if (prev)
1709               prev->next = range->next;
1710           }
1711           goto next_range;
1712         }
1713
1714         if (range_data_end > range_data_start) {
1715           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1716             goto next_range;
1717
1718           if (new_writing_pos > range_data_start) {
1719             if (new_writing_pos >= range_data_end) {
1720               GST_DEBUG_OBJECT (queue,
1721                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1722                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
1723               /* remove range */
1724               range_to_destroy = range;
1725               if (prev)
1726                 prev->next = range->next;
1727             } else {
1728               GST_DEBUG_OBJECT (queue,
1729                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
1730                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1731                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1732                   range->offset + new_writing_pos - range_data_start,
1733                   new_writing_pos);
1734               range->offset += (new_writing_pos - range_data_start);
1735               range->rb_offset = new_writing_pos;
1736             }
1737           }
1738         } else {
1739           guint64 new_wpos_virt = writing_pos + to_write;
1740
1741           if (new_wpos_virt <= range_data_start)
1742             goto next_range;
1743
1744           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1745             GST_DEBUG_OBJECT (queue,
1746                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1747                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1748             /* remove range */
1749             range_to_destroy = range;
1750             if (prev)
1751               prev->next = range->next;
1752           } else {
1753             GST_DEBUG_OBJECT (queue,
1754                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1755                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1756                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1757                 range->offset + new_writing_pos - range_data_start,
1758                 new_writing_pos);
1759             range->offset += (new_wpos_virt - range_data_start);
1760             range->rb_offset = new_writing_pos;
1761           }
1762         }
1763
1764       next_range:
1765         if (!range_to_destroy)
1766           prev = range;
1767
1768         range = range->next;
1769         if (range_to_destroy) {
1770           if (range_to_destroy == queue->ranges)
1771             queue->ranges = range;
1772           g_slice_free (GstQueue2Range, range_to_destroy);
1773           range_to_destroy = NULL;
1774         }
1775       }
1776     } else {
1777       to_write = size;
1778       new_writing_pos = writing_pos + to_write;
1779     }
1780
1781     if (QUEUE_IS_USING_TEMP_FILE (queue)
1782         && FSEEK_FILE (queue->temp_file, writing_pos))
1783       goto seek_failed;
1784
1785     if (new_writing_pos > writing_pos) {
1786       GST_INFO_OBJECT (queue,
1787           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1788           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1789           queue->current->writing_pos, queue->current->rb_writing_pos);
1790       /* either not using ring buffer or no wrapping, just write */
1791       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1792         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1793           goto handle_error;
1794       } else {
1795         memcpy (ring_buffer + writing_pos, data, to_write);
1796       }
1797
1798       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1799         /* try to merge with next range */
1800         while ((next = queue->current->next)) {
1801           GST_INFO_OBJECT (queue,
1802               "checking merge with next range %" G_GUINT64_FORMAT " < %"
1803               G_GUINT64_FORMAT, new_writing_pos, next->offset);
1804           if (new_writing_pos < next->offset)
1805             break;
1806
1807           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1808               next->writing_pos);
1809
1810           /* remove the group */
1811           queue->current->next = next->next;
1812
1813           /* We use the threshold to decide if we want to do a seek or simply
1814            * read the data again. If there is not so much data in the range we
1815            * prefer to avoid to seek and read it again. */
1816           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
1817             /* the new range had more data than the threshold, it's worth keeping
1818              * it and doing a seek. */
1819             new_writing_pos = next->writing_pos;
1820             do_seek = TRUE;
1821           }
1822           g_slice_free (GstQueue2Range, next);
1823         }
1824         goto update_and_signal;
1825       }
1826     } else {
1827       /* wrapping */
1828       guint block_one, block_two;
1829
1830       block_one = rb_size - writing_pos;
1831       block_two = to_write - block_one;
1832
1833       if (block_one > 0) {
1834         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
1835         /* write data to end of ring buffer */
1836         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1837           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1838             goto handle_error;
1839         } else {
1840           memcpy (ring_buffer + writing_pos, data, block_one);
1841         }
1842       }
1843
1844       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
1845         goto seek_failed;
1846
1847       if (block_two > 0) {
1848         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
1849         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1850           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
1851             goto handle_error;
1852         } else {
1853           memcpy (ring_buffer, data + block_one, block_two);
1854         }
1855       }
1856     }
1857
1858   update_and_signal:
1859     /* update the writing positions */
1860     size -= to_write;
1861     GST_INFO_OBJECT (queue,
1862         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
1863         to_write, writing_pos, size);
1864
1865     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1866       data += to_write;
1867       queue->current->writing_pos += to_write;
1868       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1869     } else {
1870       queue->current->writing_pos = writing_pos = new_writing_pos;
1871     }
1872     if (do_seek)
1873       perform_seek_to_offset (queue, new_writing_pos);
1874
1875     update_cur_level (queue, queue->current);
1876
1877     /* update the buffering status */
1878     if (queue->use_buffering)
1879       update_buffering (queue);
1880
1881     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1882         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1883
1884     GST_QUEUE2_SIGNAL_ADD (queue);
1885   }
1886
1887   gst_buffer_unmap (buffer, &info);
1888
1889   return TRUE;
1890
1891   /* ERRORS */
1892 out_flushing:
1893   {
1894     GST_DEBUG_OBJECT (queue, "we are flushing");
1895     gst_buffer_unmap (buffer, &info);
1896     /* FIXME - GST_FLOW_EOS ? */
1897     return FALSE;
1898   }
1899 seek_failed:
1900   {
1901     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1902     gst_buffer_unmap (buffer, &info);
1903     return FALSE;
1904   }
1905 handle_error:
1906   {
1907     switch (errno) {
1908       case ENOSPC:{
1909         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
1910         break;
1911       }
1912       default:{
1913         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
1914             (_("Error while writing to download file.")),
1915             ("%s", g_strerror (errno)));
1916       }
1917     }
1918     gst_buffer_unmap (buffer, &info);
1919     return FALSE;
1920   }
1921 }
1922
1923 static gboolean
1924 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
1925 {
1926   GstQueue2 *queue = q;
1927
1928   GST_TRACE_OBJECT (queue,
1929       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
1930       gst_buffer_get_size (*buf));
1931
1932   if (!gst_queue2_create_write (queue, *buf)) {
1933     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
1934     return FALSE;
1935   }
1936   return TRUE;
1937 }
1938
1939 static gboolean
1940 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
1941 {
1942   guint *p_size = data;
1943   gsize buf_size;
1944
1945   buf_size = gst_buffer_get_size (*buf);
1946   GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
1947   *p_size += buf_size;
1948   return TRUE;
1949 }
1950
1951 /* enqueue an item an update the level stats */
1952 static void
1953 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
1954     GstQueue2ItemType item_type)
1955 {
1956   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
1957     GstBuffer *buffer;
1958     guint size;
1959
1960     buffer = GST_BUFFER_CAST (item);
1961     size = gst_buffer_get_size (buffer);
1962
1963     /* add buffer to the statistics */
1964     if (QUEUE_IS_USING_QUEUE (queue)) {
1965       queue->cur_level.buffers++;
1966       queue->cur_level.bytes += size;
1967     }
1968     queue->bytes_in += size;
1969
1970     /* apply new buffer to segment stats */
1971     apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
1972     /* update the byterate stats */
1973     update_in_rates (queue);
1974
1975     if (!QUEUE_IS_USING_QUEUE (queue)) {
1976       /* FIXME - check return value? */
1977       gst_queue2_create_write (queue, buffer);
1978     }
1979   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
1980     GstBufferList *buffer_list;
1981     guint size = 0;
1982
1983     buffer_list = GST_BUFFER_LIST_CAST (item);
1984
1985     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
1986     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
1987
1988     /* add buffer to the statistics */
1989     if (QUEUE_IS_USING_QUEUE (queue)) {
1990       queue->cur_level.buffers++;
1991       queue->cur_level.bytes += size;
1992     }
1993     queue->bytes_in += size;
1994
1995     /* apply new buffer to segment stats */
1996     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
1997
1998     /* update the byterate stats */
1999     update_in_rates (queue);
2000
2001     if (!QUEUE_IS_USING_QUEUE (queue)) {
2002       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2003     }
2004   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2005     GstEvent *event;
2006
2007     event = GST_EVENT_CAST (item);
2008
2009     switch (GST_EVENT_TYPE (event)) {
2010       case GST_EVENT_EOS:
2011         /* Zero the thresholds, this makes sure the queue is completely
2012          * filled and we can read all data from the queue. */
2013         GST_DEBUG_OBJECT (queue, "we have EOS");
2014         queue->is_eos = TRUE;
2015         break;
2016       case GST_EVENT_SEGMENT:
2017         apply_segment (queue, event, &queue->sink_segment, TRUE);
2018         /* This is our first new segment, we hold it
2019          * as we can't save it on the temp file */
2020         if (!QUEUE_IS_USING_QUEUE (queue)) {
2021           if (queue->segment_event_received)
2022             goto unexpected_event;
2023
2024           queue->segment_event_received = TRUE;
2025           if (queue->starting_segment != NULL)
2026             gst_event_unref (queue->starting_segment);
2027           queue->starting_segment = event;
2028           item = NULL;
2029         }
2030         /* a new segment allows us to accept more buffers if we got EOS
2031          * from downstream */
2032         queue->unexpected = FALSE;
2033         break;
2034       case GST_EVENT_STREAM_START:
2035         if (!QUEUE_IS_USING_QUEUE (queue)) {
2036           gst_event_replace (&queue->stream_start_event, event);
2037           gst_event_unref (event);
2038           item = NULL;
2039         }
2040         break;
2041       case GST_EVENT_CAPS:{
2042         GstCaps *caps;
2043
2044         gst_event_parse_caps (event, &caps);
2045         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2046
2047         if (!QUEUE_IS_USING_QUEUE (queue)) {
2048           GST_LOG ("Dropping caps event, not using queue");
2049           gst_event_unref (event);
2050           item = NULL;
2051         }
2052         break;
2053       }
2054       default:
2055         if (!QUEUE_IS_USING_QUEUE (queue))
2056           goto unexpected_event;
2057         break;
2058     }
2059   } else if (GST_IS_QUERY (item)) {
2060     /* Can't happen as we check that in the caller */
2061     if (!QUEUE_IS_USING_QUEUE (queue))
2062       g_assert_not_reached ();
2063   } else {
2064     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2065         item, GST_OBJECT_NAME (queue));
2066     /* we can't really unref since we don't know what it is */
2067     item = NULL;
2068   }
2069
2070   if (item) {
2071     /* update the buffering status */
2072     if (queue->use_buffering)
2073       update_buffering (queue);
2074
2075     if (QUEUE_IS_USING_QUEUE (queue)) {
2076       GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2077       qitem->type = item_type;
2078       qitem->item = item;
2079       g_queue_push_tail (&queue->queue, qitem);
2080     } else {
2081       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2082     }
2083
2084     GST_QUEUE2_SIGNAL_ADD (queue);
2085   }
2086
2087   return;
2088
2089   /* ERRORS */
2090 unexpected_event:
2091   {
2092     g_warning
2093         ("Unexpected event of kind %s can't be added in temp file of queue %s ",
2094         gst_event_type_get_name (GST_EVENT_TYPE (item)),
2095         GST_OBJECT_NAME (queue));
2096     gst_event_unref (GST_EVENT_CAST (item));
2097     return;
2098   }
2099 }
2100
2101 /* dequeue an item from the queue and update level stats */
2102 static GstMiniObject *
2103 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2104 {
2105   GstMiniObject *item;
2106
2107   if (!QUEUE_IS_USING_QUEUE (queue)) {
2108     item = gst_queue2_read_item_from_file (queue);
2109   } else {
2110     GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2111
2112     if (qitem == NULL)
2113       goto no_item;
2114
2115     item = qitem->item;
2116     g_slice_free (GstQueue2Item, qitem);
2117   }
2118
2119   if (item == NULL)
2120     goto no_item;
2121
2122   if (GST_IS_BUFFER (item)) {
2123     GstBuffer *buffer;
2124     guint size;
2125
2126     buffer = GST_BUFFER_CAST (item);
2127     size = gst_buffer_get_size (buffer);
2128     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2129
2130     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2131         "retrieved buffer %p from queue", buffer);
2132
2133     if (QUEUE_IS_USING_QUEUE (queue)) {
2134       queue->cur_level.buffers--;
2135       queue->cur_level.bytes -= size;
2136     }
2137     queue->bytes_out += size;
2138
2139     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
2140     /* update the byterate stats */
2141     update_out_rates (queue);
2142     /* update the buffering */
2143     if (queue->use_buffering)
2144       update_buffering (queue);
2145
2146   } else if (GST_IS_EVENT (item)) {
2147     GstEvent *event = GST_EVENT_CAST (item);
2148
2149     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2150
2151     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2152         "retrieved event %p from queue", event);
2153
2154     switch (GST_EVENT_TYPE (event)) {
2155       case GST_EVENT_EOS:
2156         /* queue is empty now that we dequeued the EOS */
2157         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2158         break;
2159       case GST_EVENT_SEGMENT:
2160         apply_segment (queue, event, &queue->src_segment, FALSE);
2161         break;
2162       default:
2163         break;
2164     }
2165   } else if (GST_IS_BUFFER_LIST (item)) {
2166     GstBufferList *buffer_list;
2167     guint size = 0;
2168
2169     buffer_list = GST_BUFFER_LIST_CAST (item);
2170     gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2171     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2172
2173     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2174         "retrieved buffer list %p from queue", buffer_list);
2175
2176     if (QUEUE_IS_USING_QUEUE (queue)) {
2177       queue->cur_level.buffers--;
2178       queue->cur_level.bytes -= size;
2179     }
2180     queue->bytes_out += size;
2181
2182     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2183     /* update the byterate stats */
2184     update_out_rates (queue);
2185     /* update the buffering */
2186     if (queue->use_buffering)
2187       update_buffering (queue);
2188   } else if (GST_IS_QUERY (item)) {
2189     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2190         "retrieved query %p from queue", item);
2191     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2192   } else {
2193     g_warning
2194         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2195         item, GST_OBJECT_NAME (queue));
2196     item = NULL;
2197     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2198   }
2199   GST_QUEUE2_SIGNAL_DEL (queue);
2200
2201   return item;
2202
2203   /* ERRORS */
2204 no_item:
2205   {
2206     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2207     return NULL;
2208   }
2209 }
2210
2211 static gboolean
2212 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2213     GstEvent * event)
2214 {
2215   gboolean ret = TRUE;
2216   GstQueue2 *queue;
2217
2218   queue = GST_QUEUE2 (parent);
2219
2220   switch (GST_EVENT_TYPE (event)) {
2221     case GST_EVENT_FLUSH_START:
2222     {
2223       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2224       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2225         /* forward event */
2226         ret = gst_pad_push_event (queue->srcpad, event);
2227
2228         /* now unblock the chain function */
2229         GST_QUEUE2_MUTEX_LOCK (queue);
2230         queue->srcresult = GST_FLOW_FLUSHING;
2231         queue->sinkresult = GST_FLOW_FLUSHING;
2232         /* unblock the loop and chain functions */
2233         GST_QUEUE2_SIGNAL_ADD (queue);
2234         GST_QUEUE2_SIGNAL_DEL (queue);
2235         queue->last_query = FALSE;
2236         g_cond_signal (&queue->query_handled);
2237         GST_QUEUE2_MUTEX_UNLOCK (queue);
2238
2239         /* make sure it pauses, this should happen since we sent
2240          * flush_start downstream. */
2241         gst_pad_pause_task (queue->srcpad);
2242         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2243       } else {
2244         GST_QUEUE2_MUTEX_LOCK (queue);
2245         /* flush the sink pad */
2246         queue->sinkresult = GST_FLOW_FLUSHING;
2247         GST_QUEUE2_SIGNAL_DEL (queue);
2248         queue->last_query = FALSE;
2249         g_cond_signal (&queue->query_handled);
2250         GST_QUEUE2_MUTEX_UNLOCK (queue);
2251
2252         gst_event_unref (event);
2253       }
2254       break;
2255     }
2256     case GST_EVENT_FLUSH_STOP:
2257     {
2258       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2259
2260       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2261         /* forward event */
2262         ret = gst_pad_push_event (queue->srcpad, event);
2263
2264         GST_QUEUE2_MUTEX_LOCK (queue);
2265         gst_queue2_locked_flush (queue, FALSE, TRUE);
2266         queue->srcresult = GST_FLOW_OK;
2267         queue->sinkresult = GST_FLOW_OK;
2268         queue->is_eos = FALSE;
2269         queue->unexpected = FALSE;
2270         queue->seeking = FALSE;
2271         /* reset rate counters */
2272         reset_rate_timer (queue);
2273         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2274             queue->srcpad, NULL);
2275         GST_QUEUE2_MUTEX_UNLOCK (queue);
2276       } else {
2277         GST_QUEUE2_MUTEX_LOCK (queue);
2278         queue->segment_event_received = FALSE;
2279         queue->is_eos = FALSE;
2280         queue->unexpected = FALSE;
2281         queue->sinkresult = GST_FLOW_OK;
2282         queue->seeking = FALSE;
2283         GST_QUEUE2_MUTEX_UNLOCK (queue);
2284
2285         gst_event_unref (event);
2286       }
2287       break;
2288     }
2289     default:
2290       if (GST_EVENT_IS_SERIALIZED (event)) {
2291         /* serialized events go in the queue */
2292         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2293         if (queue->srcresult != GST_FLOW_OK) {
2294           /* Errors in sticky event pushing are no problem and ignored here
2295            * as they will cause more meaningful errors during data flow.
2296            * For EOS events, that are not followed by data flow, we still
2297            * return FALSE here though and report an error.
2298            */
2299           if (!GST_EVENT_IS_STICKY (event)) {
2300             goto out_flow_error;
2301           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2302             if (queue->srcresult == GST_FLOW_NOT_LINKED
2303                 || queue->srcresult < GST_FLOW_EOS) {
2304               GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2305                   (_("Internal data flow error.")),
2306                   ("streaming task paused, reason %s (%d)",
2307                       gst_flow_get_name (queue->srcresult), queue->srcresult));
2308             }
2309             goto out_flow_error;
2310           }
2311         }
2312         /* refuse more events on EOS */
2313         if (queue->is_eos)
2314           goto out_eos;
2315         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2316         GST_QUEUE2_MUTEX_UNLOCK (queue);
2317       } else {
2318         /* non-serialized events are passed upstream. */
2319         ret = gst_pad_push_event (queue->srcpad, event);
2320       }
2321       break;
2322   }
2323   return ret;
2324
2325   /* ERRORS */
2326 out_flushing:
2327   {
2328     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2329     GST_QUEUE2_MUTEX_UNLOCK (queue);
2330     gst_event_unref (event);
2331     return FALSE;
2332   }
2333 out_eos:
2334   {
2335     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2336     GST_QUEUE2_MUTEX_UNLOCK (queue);
2337     gst_event_unref (event);
2338     return FALSE;
2339   }
2340 out_flow_error:
2341   {
2342     GST_LOG_OBJECT (queue,
2343         "refusing event, we have a downstream flow error: %s",
2344         gst_flow_get_name (queue->srcresult));
2345     GST_QUEUE2_MUTEX_UNLOCK (queue);
2346     gst_event_unref (event);
2347     return FALSE;
2348   }
2349 }
2350
2351 static gboolean
2352 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2353     GstQuery * query)
2354 {
2355   GstQueue2 *queue;
2356   gboolean res;
2357
2358   queue = GST_QUEUE2 (parent);
2359
2360   switch (GST_QUERY_TYPE (query)) {
2361     default:
2362       if (GST_QUERY_IS_SERIALIZED (query)) {
2363         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2364         /* serialized events go in the queue. We need to be certain that we
2365          * don't cause deadlocks waiting for the query return value. We check if
2366          * the queue is empty (nothing is blocking downstream and the query can
2367          * be pushed for sure) or we are not buffering. If we are buffering,
2368          * the pipeline waits to unblock downstream until our queue fills up
2369          * completely, which can not happen if we block on the query..
2370          * Therefore we only potentially block when we are not buffering. */
2371         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2372         if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2373                 || !queue->use_buffering)) {
2374           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2375             gst_queue2_locked_enqueue (queue, query,
2376                 GST_QUEUE2_ITEM_TYPE_QUERY);
2377
2378             STATUS (queue, queue->sinkpad, "wait for QUERY");
2379             g_cond_wait (&queue->query_handled, &queue->qlock);
2380             if (queue->sinkresult != GST_FLOW_OK)
2381               goto out_flushing;
2382             res = queue->last_query;
2383           } else {
2384             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2385             res = FALSE;
2386           }
2387         } else {
2388           GST_DEBUG_OBJECT (queue,
2389               "refusing query, we are not using the queue");
2390           res = FALSE;
2391         }
2392         GST_QUEUE2_MUTEX_UNLOCK (queue);
2393       } else {
2394         res = gst_pad_query_default (pad, parent, query);
2395       }
2396       break;
2397   }
2398   return res;
2399
2400   /* ERRORS */
2401 out_flushing:
2402   {
2403     GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2404     GST_QUEUE2_MUTEX_UNLOCK (queue);
2405     return FALSE;
2406   }
2407 }
2408
2409 static gboolean
2410 gst_queue2_is_empty (GstQueue2 * queue)
2411 {
2412   /* never empty on EOS */
2413   if (queue->is_eos)
2414     return FALSE;
2415
2416   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2417     return queue->current->writing_pos <= queue->current->max_reading_pos;
2418   } else {
2419     if (queue->queue.length == 0)
2420       return TRUE;
2421   }
2422
2423   return FALSE;
2424 }
2425
2426 static gboolean
2427 gst_queue2_is_filled (GstQueue2 * queue)
2428 {
2429   gboolean res;
2430
2431   /* always filled on EOS */
2432   if (queue->is_eos)
2433     return TRUE;
2434
2435 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2436     (queue->cur_level.format) >= ((alt_max) ? \
2437       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2438
2439   /* if using a ring buffer we're filled if all ring buffer space is used
2440    * _by the current range_ */
2441   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2442     guint64 rb_size = queue->ring_buffer_max_size;
2443     GST_DEBUG_OBJECT (queue,
2444         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2445         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2446     return CHECK_FILLED (bytes, rb_size);
2447   }
2448
2449   /* if using file, we're never filled if we don't have EOS */
2450   if (QUEUE_IS_USING_TEMP_FILE (queue))
2451     return FALSE;
2452
2453   /* we are never filled when we have no buffers at all */
2454   if (queue->cur_level.buffers == 0)
2455     return FALSE;
2456
2457   /* we are filled if one of the current levels exceeds the max */
2458   res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2459       || CHECK_FILLED (time, 0);
2460
2461   /* if we need to, use the rate estimate to check against the max time we are
2462    * allowed to queue */
2463   if (queue->use_rate_estimate)
2464     res |= CHECK_FILLED (rate_time, 0);
2465
2466 #undef CHECK_FILLED
2467   return res;
2468 }
2469
2470 static GstFlowReturn
2471 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2472     GstMiniObject * item, GstQueue2ItemType item_type)
2473 {
2474   /* we have to lock the queue since we span threads */
2475   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2476   /* when we received EOS, we refuse more data */
2477   if (queue->is_eos)
2478     goto out_eos;
2479   /* when we received unexpected from downstream, refuse more buffers */
2480   if (queue->unexpected)
2481     goto out_unexpected;
2482
2483   /* while we didn't receive the newsegment, we're seeking and we skip data */
2484   if (queue->seeking)
2485     goto out_seeking;
2486
2487   if (!gst_queue2_wait_free_space (queue))
2488     goto out_flushing;
2489
2490   /* put buffer in queue now */
2491   gst_queue2_locked_enqueue (queue, item, item_type);
2492   GST_QUEUE2_MUTEX_UNLOCK (queue);
2493
2494   return GST_FLOW_OK;
2495
2496   /* special conditions */
2497 out_flushing:
2498   {
2499     GstFlowReturn ret = queue->sinkresult;
2500
2501     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2502         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2503     GST_QUEUE2_MUTEX_UNLOCK (queue);
2504     gst_mini_object_unref (item);
2505
2506     return ret;
2507   }
2508 out_eos:
2509   {
2510     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2511     GST_QUEUE2_MUTEX_UNLOCK (queue);
2512     gst_mini_object_unref (item);
2513
2514     return GST_FLOW_EOS;
2515   }
2516 out_seeking:
2517   {
2518     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2519     GST_QUEUE2_MUTEX_UNLOCK (queue);
2520     gst_mini_object_unref (item);
2521
2522     return GST_FLOW_OK;
2523   }
2524 out_unexpected:
2525   {
2526     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2527     GST_QUEUE2_MUTEX_UNLOCK (queue);
2528     gst_mini_object_unref (item);
2529
2530     return GST_FLOW_EOS;
2531   }
2532 }
2533
2534 static GstFlowReturn
2535 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2536 {
2537   GstQueue2 *queue;
2538
2539   queue = GST_QUEUE2 (parent);
2540
2541   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2542       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2543       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2544       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2545       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2546
2547   return gst_queue2_chain_buffer_or_buffer_list (queue,
2548       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2549 }
2550
2551 static GstFlowReturn
2552 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2553     GstBufferList * buffer_list)
2554 {
2555   GstQueue2 *queue;
2556
2557   queue = GST_QUEUE2 (parent);
2558
2559   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2560       "received buffer list %p", buffer_list);
2561
2562   return gst_queue2_chain_buffer_or_buffer_list (queue,
2563       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2564 }
2565
2566 static GstMiniObject *
2567 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2568 {
2569   GstMiniObject *data;
2570
2571   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2572
2573   /* stop pushing buffers, we dequeue all items until we see an item that we
2574    * can push again, which is EOS or SEGMENT. If there is nothing in the
2575    * queue we can push, we set a flag to make the sinkpad refuse more
2576    * buffers with an EOS return value until we receive something
2577    * pushable again or we get flushed. */
2578   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2579     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2580       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2581           "dropping EOS buffer %p", data);
2582       gst_buffer_unref (GST_BUFFER_CAST (data));
2583     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2584       GstEvent *event = GST_EVENT_CAST (data);
2585       GstEventType type = GST_EVENT_TYPE (event);
2586
2587       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2588         /* we found a pushable item in the queue, push it out */
2589         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2590             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2591         return data;
2592       }
2593       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2594           "dropping EOS event %p", event);
2595       gst_event_unref (event);
2596     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2597       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2598           "dropping EOS buffer list %p", data);
2599       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2600     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2601       queue->last_query = FALSE;
2602       g_cond_signal (&queue->query_handled);
2603       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2604     }
2605   }
2606   /* no more items in the queue. Set the unexpected flag so that upstream
2607    * make us refuse any more buffers on the sinkpad. Since we will still
2608    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2609    * task function does not shut down. */
2610   queue->unexpected = TRUE;
2611   return NULL;
2612 }
2613
2614 /* dequeue an item from the queue an push it downstream. This functions returns
2615  * the result of the push. */
2616 static GstFlowReturn
2617 gst_queue2_push_one (GstQueue2 * queue)
2618 {
2619   GstFlowReturn result = queue->srcresult;
2620   GstMiniObject *data;
2621   GstQueue2ItemType item_type;
2622
2623   data = gst_queue2_locked_dequeue (queue, &item_type);
2624   if (data == NULL)
2625     goto no_item;
2626
2627 next:
2628   STATUS (queue, queue->srcpad, "We have something dequeud");
2629   g_atomic_int_set (&queue->downstream_may_block,
2630       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2631       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2632   GST_QUEUE2_MUTEX_UNLOCK (queue);
2633
2634   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2635     GstBuffer *buffer;
2636
2637     buffer = GST_BUFFER_CAST (data);
2638
2639     result = gst_pad_push (queue->srcpad, buffer);
2640     g_atomic_int_set (&queue->downstream_may_block, 0);
2641
2642     /* need to check for srcresult here as well */
2643     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2644     if (result == GST_FLOW_EOS) {
2645       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2646       if (data != NULL)
2647         goto next;
2648       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2649        * to the caller so that the task function does not shut down */
2650       result = GST_FLOW_OK;
2651     }
2652   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2653     GstEvent *event = GST_EVENT_CAST (data);
2654     GstEventType type = GST_EVENT_TYPE (event);
2655
2656     gst_pad_push_event (queue->srcpad, event);
2657
2658     /* if we're EOS, return EOS so that the task pauses. */
2659     if (type == GST_EVENT_EOS) {
2660       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2661           "pushed EOS event %p, return EOS", event);
2662       result = GST_FLOW_EOS;
2663     }
2664
2665     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2666   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2667     GstBufferList *buffer_list;
2668
2669     buffer_list = GST_BUFFER_LIST_CAST (data);
2670
2671     result = gst_pad_push_list (queue->srcpad, buffer_list);
2672     g_atomic_int_set (&queue->downstream_may_block, 0);
2673
2674     /* need to check for srcresult here as well */
2675     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2676     if (result == GST_FLOW_EOS) {
2677       data = gst_queue2_dequeue_on_eos (queue, &item_type);
2678       if (data != NULL)
2679         goto next;
2680       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2681        * to the caller so that the task function does not shut down */
2682       result = GST_FLOW_OK;
2683     }
2684   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2685     GstQuery *query = GST_QUERY_CAST (data);
2686
2687     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2688     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2689     GST_LOG_OBJECT (queue->srcpad, "Peered query");
2690     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2691         "did query %p, return %d", query, queue->last_query);
2692     g_cond_signal (&queue->query_handled);
2693     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2694     result = GST_FLOW_OK;
2695   }
2696   return result;
2697
2698   /* ERRORS */
2699 no_item:
2700   {
2701     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2702         "exit because we have no item in the queue");
2703     return GST_FLOW_ERROR;
2704   }
2705 out_flushing:
2706   {
2707     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2708     return GST_FLOW_FLUSHING;
2709   }
2710 }
2711
2712 /* called repeatedly with @pad as the source pad. This function should push out
2713  * data to the peer element. */
2714 static void
2715 gst_queue2_loop (GstPad * pad)
2716 {
2717   GstQueue2 *queue;
2718   GstFlowReturn ret;
2719
2720   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2721
2722   /* have to lock for thread-safety */
2723   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2724
2725   if (gst_queue2_is_empty (queue)) {
2726     gboolean started;
2727
2728     /* pause the timer while we wait. The fact that we are waiting does not mean
2729      * the byterate on the output pad is lower */
2730     if ((started = queue->out_timer_started))
2731       g_timer_stop (queue->out_timer);
2732
2733     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2734         "queue is empty, waiting for new data");
2735     do {
2736       /* Wait for data to be available, we could be unlocked because of a flush. */
2737       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2738     }
2739     while (gst_queue2_is_empty (queue));
2740
2741     /* and continue if we were running before */
2742     if (started)
2743       g_timer_continue (queue->out_timer);
2744   }
2745   ret = gst_queue2_push_one (queue);
2746   queue->srcresult = ret;
2747   queue->sinkresult = ret;
2748   if (ret != GST_FLOW_OK)
2749     goto out_flushing;
2750
2751   GST_QUEUE2_MUTEX_UNLOCK (queue);
2752
2753   return;
2754
2755   /* ERRORS */
2756 out_flushing:
2757   {
2758     gboolean eos = queue->is_eos;
2759     GstFlowReturn ret = queue->srcresult;
2760
2761     gst_pad_pause_task (queue->srcpad);
2762     GST_QUEUE2_MUTEX_UNLOCK (queue);
2763     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2764         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2765     /* let app know about us giving up if upstream is not expected to do so */
2766     /* EOS is already taken care of elsewhere */
2767     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
2768       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2769           (_("Internal data flow error.")),
2770           ("streaming task paused, reason %s (%d)",
2771               gst_flow_get_name (ret), ret));
2772       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2773     }
2774     return;
2775   }
2776 }
2777
2778 static gboolean
2779 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2780 {
2781   gboolean res = TRUE;
2782   GstQueue2 *queue = GST_QUEUE2 (parent);
2783
2784 #ifndef GST_DISABLE_GST_DEBUG
2785   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2786       event, GST_EVENT_TYPE_NAME (event));
2787 #endif
2788
2789   switch (GST_EVENT_TYPE (event)) {
2790     case GST_EVENT_FLUSH_START:
2791       if (QUEUE_IS_USING_QUEUE (queue)) {
2792         /* just forward upstream */
2793         res = gst_pad_push_event (queue->sinkpad, event);
2794       } else {
2795         /* now unblock the getrange function */
2796         GST_QUEUE2_MUTEX_LOCK (queue);
2797         GST_DEBUG_OBJECT (queue, "flushing");
2798         queue->srcresult = GST_FLOW_FLUSHING;
2799         GST_QUEUE2_SIGNAL_ADD (queue);
2800         GST_QUEUE2_MUTEX_UNLOCK (queue);
2801
2802         /* when using a temp file, we eat the event */
2803         res = TRUE;
2804         gst_event_unref (event);
2805       }
2806       break;
2807     case GST_EVENT_FLUSH_STOP:
2808       if (QUEUE_IS_USING_QUEUE (queue)) {
2809         /* just forward upstream */
2810         res = gst_pad_push_event (queue->sinkpad, event);
2811       } else {
2812         /* now unblock the getrange function */
2813         GST_QUEUE2_MUTEX_LOCK (queue);
2814         queue->srcresult = GST_FLOW_OK;
2815         GST_QUEUE2_MUTEX_UNLOCK (queue);
2816
2817         /* when using a temp file, we eat the event */
2818         res = TRUE;
2819         gst_event_unref (event);
2820       }
2821       break;
2822     case GST_EVENT_RECONFIGURE:
2823       GST_QUEUE2_MUTEX_LOCK (queue);
2824       /* assume downstream is linked now and try to push again */
2825       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
2826         queue->srcresult = GST_FLOW_OK;
2827         queue->sinkresult = GST_FLOW_OK;
2828         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
2829           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
2830               NULL);
2831         }
2832       }
2833       GST_QUEUE2_MUTEX_UNLOCK (queue);
2834
2835       res = gst_pad_push_event (queue->sinkpad, event);
2836       break;
2837     default:
2838       res = gst_pad_push_event (queue->sinkpad, event);
2839       break;
2840   }
2841
2842   return res;
2843 }
2844
2845 static gboolean
2846 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2847 {
2848   GstQueue2 *queue;
2849
2850   queue = GST_QUEUE2 (parent);
2851
2852   switch (GST_QUERY_TYPE (query)) {
2853     case GST_QUERY_POSITION:
2854     {
2855       gint64 peer_pos;
2856       GstFormat format;
2857
2858       if (!gst_pad_peer_query (queue->sinkpad, query))
2859         goto peer_failed;
2860
2861       /* get peer position */
2862       gst_query_parse_position (query, &format, &peer_pos);
2863
2864       /* FIXME: this code assumes that there's no discont in the queue */
2865       switch (format) {
2866         case GST_FORMAT_BYTES:
2867           peer_pos -= queue->cur_level.bytes;
2868           break;
2869         case GST_FORMAT_TIME:
2870           peer_pos -= queue->cur_level.time;
2871           break;
2872         default:
2873           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
2874               "know how to adjust value", gst_format_get_name (format));
2875           return FALSE;
2876       }
2877       /* set updated position */
2878       gst_query_set_position (query, format, peer_pos);
2879       break;
2880     }
2881     case GST_QUERY_DURATION:
2882     {
2883       GST_DEBUG_OBJECT (queue, "doing peer query");
2884
2885       if (!gst_pad_peer_query (queue->sinkpad, query))
2886         goto peer_failed;
2887
2888       GST_DEBUG_OBJECT (queue, "peer query success");
2889       break;
2890     }
2891     case GST_QUERY_BUFFERING:
2892     {
2893       gint percent;
2894       gboolean is_buffering;
2895       GstBufferingMode mode;
2896       gint avg_in, avg_out;
2897       gint64 buffering_left;
2898
2899       GST_DEBUG_OBJECT (queue, "query buffering");
2900
2901       get_buffering_percent (queue, &is_buffering, &percent);
2902       gst_query_set_buffering_percent (query, is_buffering, percent);
2903
2904       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
2905           &buffering_left);
2906       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
2907           buffering_left);
2908
2909       if (!QUEUE_IS_USING_QUEUE (queue)) {
2910         /* add ranges for download and ringbuffer buffering */
2911         GstFormat format;
2912         gint64 start, stop, range_start, range_stop;
2913         guint64 writing_pos;
2914         gint64 estimated_total;
2915         gint64 duration;
2916         gboolean peer_res, is_eos;
2917         GstQueue2Range *queued_ranges;
2918
2919         /* we need a current download region */
2920         if (queue->current == NULL)
2921           return FALSE;
2922
2923         writing_pos = queue->current->writing_pos;
2924         is_eos = queue->is_eos;
2925
2926         if (is_eos) {
2927           /* we're EOS, we know the duration in bytes now */
2928           peer_res = TRUE;
2929           duration = writing_pos;
2930         } else {
2931           /* get duration of upstream in bytes */
2932           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
2933               GST_FORMAT_BYTES, &duration);
2934         }
2935
2936         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
2937             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
2938
2939         /* calculate remaining and total download time */
2940         if (peer_res && avg_in > 0.0)
2941           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
2942         else
2943           estimated_total = -1;
2944
2945         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
2946             estimated_total);
2947
2948         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
2949
2950         switch (format) {
2951           case GST_FORMAT_PERCENT:
2952             /* we need duration */
2953             if (!peer_res)
2954               goto peer_failed;
2955
2956             start = 0;
2957             /* get our available data relative to the duration */
2958             if (duration != -1)
2959               stop =
2960                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
2961                   duration);
2962             else
2963               stop = -1;
2964             break;
2965           case GST_FORMAT_BYTES:
2966             start = 0;
2967             stop = writing_pos;
2968             break;
2969           default:
2970             start = -1;
2971             stop = -1;
2972             break;
2973         }
2974
2975         /* fill out the buffered ranges */
2976         for (queued_ranges = queue->ranges; queued_ranges;
2977             queued_ranges = queued_ranges->next) {
2978           switch (format) {
2979             case GST_FORMAT_PERCENT:
2980               if (duration == -1) {
2981                 range_start = 0;
2982                 range_stop = 0;
2983                 break;
2984               }
2985               range_start =
2986                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
2987                   queued_ranges->offset, duration);
2988               range_stop =
2989                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
2990                   queued_ranges->writing_pos, duration);
2991               break;
2992             case GST_FORMAT_BYTES:
2993               range_start = queued_ranges->offset;
2994               range_stop = queued_ranges->writing_pos;
2995               break;
2996             default:
2997               range_start = -1;
2998               range_stop = -1;
2999               break;
3000           }
3001           if (range_start == range_stop)
3002             continue;
3003           GST_DEBUG_OBJECT (queue,
3004               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3005               G_GINT64_FORMAT, range_start, range_stop);
3006           gst_query_add_buffering_range (query, range_start, range_stop);
3007         }
3008
3009         gst_query_set_buffering_range (query, format, start, stop,
3010             estimated_total);
3011       }
3012       break;
3013     }
3014     case GST_QUERY_SCHEDULING:
3015     {
3016       gboolean pull_mode;
3017       GstSchedulingFlags flags = 0;
3018
3019       if (!gst_pad_peer_query (queue->sinkpad, query))
3020         goto peer_failed;
3021
3022       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3023
3024       /* we can operate in pull mode when we are using a tempfile */
3025       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3026
3027       if (pull_mode)
3028         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3029       gst_query_set_scheduling (query, flags, 0, -1, 0);
3030       if (pull_mode)
3031         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3032       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3033       break;
3034     }
3035     default:
3036       /* peer handled other queries */
3037       if (!gst_pad_query_default (pad, parent, query))
3038         goto peer_failed;
3039       break;
3040   }
3041
3042   return TRUE;
3043
3044   /* ERRORS */
3045 peer_failed:
3046   {
3047     GST_DEBUG_OBJECT (queue, "failed peer query");
3048     return FALSE;
3049   }
3050 }
3051
3052 static gboolean
3053 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3054 {
3055   GstQueue2 *queue = GST_QUEUE2 (element);
3056
3057   /* simply forward to the srcpad query function */
3058   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3059       query);
3060 }
3061
3062 static void
3063 gst_queue2_update_upstream_size (GstQueue2 * queue)
3064 {
3065   gint64 upstream_size = -1;
3066
3067   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3068           &upstream_size)) {
3069     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3070     queue->upstream_size = upstream_size;
3071   }
3072 }
3073
3074 static GstFlowReturn
3075 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3076     guint length, GstBuffer ** buffer)
3077 {
3078   GstQueue2 *queue;
3079   GstFlowReturn ret;
3080
3081   queue = GST_QUEUE2_CAST (parent);
3082
3083   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3084   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3085   offset = (offset == -1) ? queue->current->reading_pos : offset;
3086
3087   GST_DEBUG_OBJECT (queue,
3088       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3089
3090   /* catch any reads beyond the size of the file here to make sure queue2
3091    * doesn't send seek events beyond the size of the file upstream, since
3092    * that would confuse elements such as souphttpsrc and/or http servers.
3093    * Demuxers often just loop until EOS at the end of the file to figure out
3094    * when they've read all the end-headers or index chunks. */
3095   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3096     gst_queue2_update_upstream_size (queue);
3097     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3098       goto out_unexpected;
3099   }
3100
3101   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3102     gst_queue2_update_upstream_size (queue);
3103     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3104       length = queue->upstream_size - offset;
3105       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3106     }
3107   }
3108
3109   /* FIXME - function will block when the range is not yet available */
3110   ret = gst_queue2_create_read (queue, offset, length, buffer);
3111   GST_QUEUE2_MUTEX_UNLOCK (queue);
3112
3113   return ret;
3114
3115   /* ERRORS */
3116 out_flushing:
3117   {
3118     ret = queue->srcresult;
3119
3120     GST_DEBUG_OBJECT (queue, "we are flushing");
3121     GST_QUEUE2_MUTEX_UNLOCK (queue);
3122     return ret;
3123   }
3124 out_unexpected:
3125   {
3126     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3127     GST_QUEUE2_MUTEX_UNLOCK (queue);
3128     return GST_FLOW_EOS;
3129   }
3130 }
3131
3132 /* sink currently only operates in push mode */
3133 static gboolean
3134 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3135     GstPadMode mode, gboolean active)
3136 {
3137   gboolean result;
3138   GstQueue2 *queue;
3139
3140   queue = GST_QUEUE2 (parent);
3141
3142   switch (mode) {
3143     case GST_PAD_MODE_PUSH:
3144       if (active) {
3145         GST_QUEUE2_MUTEX_LOCK (queue);
3146         GST_DEBUG_OBJECT (queue, "activating push mode");
3147         queue->srcresult = GST_FLOW_OK;
3148         queue->sinkresult = GST_FLOW_OK;
3149         queue->is_eos = FALSE;
3150         queue->unexpected = FALSE;
3151         reset_rate_timer (queue);
3152         GST_QUEUE2_MUTEX_UNLOCK (queue);
3153       } else {
3154         /* unblock chain function */
3155         GST_QUEUE2_MUTEX_LOCK (queue);
3156         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3157         queue->srcresult = GST_FLOW_FLUSHING;
3158         queue->sinkresult = GST_FLOW_FLUSHING;
3159         GST_QUEUE2_SIGNAL_DEL (queue);
3160         /* Unblock query handler */
3161         queue->last_query = FALSE;
3162         g_cond_signal (&queue->query_handled);
3163         GST_QUEUE2_MUTEX_UNLOCK (queue);
3164
3165         /* wait until it is unblocked and clean up */
3166         GST_PAD_STREAM_LOCK (pad);
3167         GST_QUEUE2_MUTEX_LOCK (queue);
3168         gst_queue2_locked_flush (queue, TRUE, FALSE);
3169         GST_QUEUE2_MUTEX_UNLOCK (queue);
3170         GST_PAD_STREAM_UNLOCK (pad);
3171       }
3172       result = TRUE;
3173       break;
3174     default:
3175       result = FALSE;
3176       break;
3177   }
3178   return result;
3179 }
3180
3181 /* src operating in push mode, we start a task on the source pad that pushes out
3182  * buffers from the queue */
3183 static gboolean
3184 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3185 {
3186   gboolean result = FALSE;
3187   GstQueue2 *queue;
3188
3189   queue = GST_QUEUE2 (parent);
3190
3191   if (active) {
3192     GST_QUEUE2_MUTEX_LOCK (queue);
3193     GST_DEBUG_OBJECT (queue, "activating push mode");
3194     queue->srcresult = GST_FLOW_OK;
3195     queue->sinkresult = GST_FLOW_OK;
3196     queue->is_eos = FALSE;
3197     queue->unexpected = FALSE;
3198     result =
3199         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3200     GST_QUEUE2_MUTEX_UNLOCK (queue);
3201   } else {
3202     /* unblock loop function */
3203     GST_QUEUE2_MUTEX_LOCK (queue);
3204     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3205     queue->srcresult = GST_FLOW_FLUSHING;
3206     queue->sinkresult = GST_FLOW_FLUSHING;
3207     /* the item add signal will unblock */
3208     GST_QUEUE2_SIGNAL_ADD (queue);
3209     GST_QUEUE2_MUTEX_UNLOCK (queue);
3210
3211     /* step 2, make sure streaming finishes */
3212     result = gst_pad_stop_task (pad);
3213   }
3214
3215   return result;
3216 }
3217
3218 /* pull mode, downstream will call our getrange function */
3219 static gboolean
3220 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3221 {
3222   gboolean result;
3223   GstQueue2 *queue;
3224
3225   queue = GST_QUEUE2 (parent);
3226
3227   if (active) {
3228     GST_QUEUE2_MUTEX_LOCK (queue);
3229     if (!QUEUE_IS_USING_QUEUE (queue)) {
3230       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3231         /* open the temp file now */
3232         result = gst_queue2_open_temp_location_file (queue);
3233       } else if (!queue->ring_buffer) {
3234         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3235         result = ! !queue->ring_buffer;
3236       } else {
3237         result = TRUE;
3238       }
3239
3240       GST_DEBUG_OBJECT (queue, "activating pull mode");
3241       init_ranges (queue);
3242       queue->srcresult = GST_FLOW_OK;
3243       queue->sinkresult = GST_FLOW_OK;
3244       queue->is_eos = FALSE;
3245       queue->unexpected = FALSE;
3246       queue->upstream_size = 0;
3247     } else {
3248       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3249       /* this is not allowed, we cannot operate in pull mode without a temp
3250        * file. */
3251       queue->srcresult = GST_FLOW_FLUSHING;
3252       queue->sinkresult = GST_FLOW_FLUSHING;
3253       result = FALSE;
3254     }
3255     GST_QUEUE2_MUTEX_UNLOCK (queue);
3256   } else {
3257     GST_QUEUE2_MUTEX_LOCK (queue);
3258     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3259     queue->srcresult = GST_FLOW_FLUSHING;
3260     queue->sinkresult = GST_FLOW_FLUSHING;
3261     /* this will unlock getrange */
3262     GST_QUEUE2_SIGNAL_ADD (queue);
3263     result = TRUE;
3264     GST_QUEUE2_MUTEX_UNLOCK (queue);
3265   }
3266
3267   return result;
3268 }
3269
3270 static gboolean
3271 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3272     gboolean active)
3273 {
3274   gboolean res;
3275
3276   switch (mode) {
3277     case GST_PAD_MODE_PULL:
3278       res = gst_queue2_src_activate_pull (pad, parent, active);
3279       break;
3280     case GST_PAD_MODE_PUSH:
3281       res = gst_queue2_src_activate_push (pad, parent, active);
3282       break;
3283     default:
3284       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3285       res = FALSE;
3286       break;
3287   }
3288   return res;
3289 }
3290
3291 static GstStateChangeReturn
3292 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3293 {
3294   GstQueue2 *queue;
3295   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3296
3297   queue = GST_QUEUE2 (element);
3298
3299   switch (transition) {
3300     case GST_STATE_CHANGE_NULL_TO_READY:
3301       break;
3302     case GST_STATE_CHANGE_READY_TO_PAUSED:
3303       GST_QUEUE2_MUTEX_LOCK (queue);
3304       if (!QUEUE_IS_USING_QUEUE (queue)) {
3305         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3306           if (!gst_queue2_open_temp_location_file (queue))
3307             ret = GST_STATE_CHANGE_FAILURE;
3308         } else {
3309           if (queue->ring_buffer) {
3310             g_free (queue->ring_buffer);
3311             queue->ring_buffer = NULL;
3312           }
3313           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3314             ret = GST_STATE_CHANGE_FAILURE;
3315         }
3316         init_ranges (queue);
3317       }
3318       queue->segment_event_received = FALSE;
3319       queue->starting_segment = NULL;
3320       gst_event_replace (&queue->stream_start_event, NULL);
3321       GST_QUEUE2_MUTEX_UNLOCK (queue);
3322       break;
3323     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3324       break;
3325     default:
3326       break;
3327   }
3328
3329   if (ret == GST_STATE_CHANGE_FAILURE)
3330     return ret;
3331
3332   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3333
3334   if (ret == GST_STATE_CHANGE_FAILURE)
3335     return ret;
3336
3337   switch (transition) {
3338     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3339       break;
3340     case GST_STATE_CHANGE_PAUSED_TO_READY:
3341       GST_QUEUE2_MUTEX_LOCK (queue);
3342       if (!QUEUE_IS_USING_QUEUE (queue)) {
3343         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3344           gst_queue2_close_temp_location_file (queue);
3345         } else if (queue->ring_buffer) {
3346           g_free (queue->ring_buffer);
3347           queue->ring_buffer = NULL;
3348         }
3349         clean_ranges (queue);
3350       }
3351       if (queue->starting_segment != NULL) {
3352         gst_event_unref (queue->starting_segment);
3353         queue->starting_segment = NULL;
3354       }
3355       gst_event_replace (&queue->stream_start_event, NULL);
3356       GST_QUEUE2_MUTEX_UNLOCK (queue);
3357       break;
3358     case GST_STATE_CHANGE_READY_TO_NULL:
3359       break;
3360     default:
3361       break;
3362   }
3363
3364   return ret;
3365 }
3366
3367 /* changing the capacity of the queue must wake up
3368  * the _chain function, it might have more room now
3369  * to store the buffer/event in the queue */
3370 #define QUEUE_CAPACITY_CHANGE(q) \
3371   GST_QUEUE2_SIGNAL_DEL (queue); \
3372   if (queue->use_buffering)      \
3373     update_buffering (queue);
3374
3375 /* Changing the minimum required fill level must
3376  * wake up the _loop function as it might now
3377  * be able to preceed.
3378  */
3379 #define QUEUE_THRESHOLD_CHANGE(q)\
3380   GST_QUEUE2_SIGNAL_ADD (queue);
3381
3382 static void
3383 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3384 {
3385   GstState state;
3386
3387   /* the element must be stopped in order to do this */
3388   GST_OBJECT_LOCK (queue);
3389   state = GST_STATE (queue);
3390   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3391     goto wrong_state;
3392   GST_OBJECT_UNLOCK (queue);
3393
3394   /* set new location */
3395   g_free (queue->temp_template);
3396   queue->temp_template = g_strdup (template);
3397
3398   return;
3399
3400 /* ERROR */
3401 wrong_state:
3402   {
3403     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3404     GST_OBJECT_UNLOCK (queue);
3405   }
3406 }
3407
3408 static void
3409 gst_queue2_set_property (GObject * object,
3410     guint prop_id, const GValue * value, GParamSpec * pspec)
3411 {
3412   GstQueue2 *queue = GST_QUEUE2 (object);
3413
3414   /* someone could change levels here, and since this
3415    * affects the get/put funcs, we need to lock for safety. */
3416   GST_QUEUE2_MUTEX_LOCK (queue);
3417
3418   switch (prop_id) {
3419     case PROP_MAX_SIZE_BYTES:
3420       queue->max_level.bytes = g_value_get_uint (value);
3421       QUEUE_CAPACITY_CHANGE (queue);
3422       break;
3423     case PROP_MAX_SIZE_BUFFERS:
3424       queue->max_level.buffers = g_value_get_uint (value);
3425       QUEUE_CAPACITY_CHANGE (queue);
3426       break;
3427     case PROP_MAX_SIZE_TIME:
3428       queue->max_level.time = g_value_get_uint64 (value);
3429       /* set rate_time to the same value. We use an extra field in the level
3430        * structure so that we can easily access and compare it */
3431       queue->max_level.rate_time = queue->max_level.time;
3432       QUEUE_CAPACITY_CHANGE (queue);
3433       break;
3434     case PROP_USE_BUFFERING:
3435       queue->use_buffering = g_value_get_boolean (value);
3436       if (!queue->use_buffering && queue->is_buffering) {
3437         GstMessage *msg = gst_message_new_buffering (GST_OBJECT_CAST (queue),
3438             100);
3439
3440         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3441             "posting 100%% message");
3442         queue->is_buffering = FALSE;
3443         gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
3444       }
3445
3446       if (queue->use_buffering) {
3447         queue->is_buffering = TRUE;
3448         update_buffering (queue);
3449       }
3450       break;
3451     case PROP_USE_RATE_ESTIMATE:
3452       queue->use_rate_estimate = g_value_get_boolean (value);
3453       break;
3454     case PROP_LOW_PERCENT:
3455       queue->low_percent = g_value_get_int (value);
3456       break;
3457     case PROP_HIGH_PERCENT:
3458       queue->high_percent = g_value_get_int (value);
3459       break;
3460     case PROP_TEMP_TEMPLATE:
3461       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3462       break;
3463     case PROP_TEMP_REMOVE:
3464       queue->temp_remove = g_value_get_boolean (value);
3465       break;
3466     case PROP_RING_BUFFER_MAX_SIZE:
3467       queue->ring_buffer_max_size = g_value_get_uint64 (value);
3468       break;
3469     default:
3470       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3471       break;
3472   }
3473
3474   GST_QUEUE2_MUTEX_UNLOCK (queue);
3475 }
3476
3477 static void
3478 gst_queue2_get_property (GObject * object,
3479     guint prop_id, GValue * value, GParamSpec * pspec)
3480 {
3481   GstQueue2 *queue = GST_QUEUE2 (object);
3482
3483   GST_QUEUE2_MUTEX_LOCK (queue);
3484
3485   switch (prop_id) {
3486     case PROP_CUR_LEVEL_BYTES:
3487       g_value_set_uint (value, queue->cur_level.bytes);
3488       break;
3489     case PROP_CUR_LEVEL_BUFFERS:
3490       g_value_set_uint (value, queue->cur_level.buffers);
3491       break;
3492     case PROP_CUR_LEVEL_TIME:
3493       g_value_set_uint64 (value, queue->cur_level.time);
3494       break;
3495     case PROP_MAX_SIZE_BYTES:
3496       g_value_set_uint (value, queue->max_level.bytes);
3497       break;
3498     case PROP_MAX_SIZE_BUFFERS:
3499       g_value_set_uint (value, queue->max_level.buffers);
3500       break;
3501     case PROP_MAX_SIZE_TIME:
3502       g_value_set_uint64 (value, queue->max_level.time);
3503       break;
3504     case PROP_USE_BUFFERING:
3505       g_value_set_boolean (value, queue->use_buffering);
3506       break;
3507     case PROP_USE_RATE_ESTIMATE:
3508       g_value_set_boolean (value, queue->use_rate_estimate);
3509       break;
3510     case PROP_LOW_PERCENT:
3511       g_value_set_int (value, queue->low_percent);
3512       break;
3513     case PROP_HIGH_PERCENT:
3514       g_value_set_int (value, queue->high_percent);
3515       break;
3516     case PROP_TEMP_TEMPLATE:
3517       g_value_set_string (value, queue->temp_template);
3518       break;
3519     case PROP_TEMP_LOCATION:
3520       g_value_set_string (value, queue->temp_location);
3521       break;
3522     case PROP_TEMP_REMOVE:
3523       g_value_set_boolean (value, queue->temp_remove);
3524       break;
3525     case PROP_RING_BUFFER_MAX_SIZE:
3526       g_value_set_uint64 (value, queue->ring_buffer_max_size);
3527       break;
3528     default:
3529       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3530       break;
3531   }
3532
3533   GST_QUEUE2_MUTEX_UNLOCK (queue);
3534 }