2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2003 Colin Walters <cwalters@gnome.org>
4 * 2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5 * 2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6 * SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
27 * SECTION:element-queue2
29 * Data is queued until one of the limits specified by the
30 * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31 * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32 * more buffers into the queue will block the pushing thread until more space
35 * The queue will create a new thread on the source pad to decouple the
36 * processing on sink and source pad.
38 * You can query how many buffers are queued by reading the
39 * #GstQueue2:current-level-buffers property.
41 * The default queue size limits are 100 buffers, 2MB of data, or
42 * two seconds worth of data, whichever is reached first.
44 * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
45 * will allocate a random free filename and buffer data in the file.
46 * By using this, it will buffer the entire stream data on the file independently
47 * of the queue size limits, they will only be used for buffering statistics.
49 * The temp-location property will be used to notify the application of the
57 #include "gstqueue2.h"
59 #include <glib/gstdio.h>
61 #include "gst/gst-i18n-lib.h"
62 #include "gst/glib-compat-private.h"
67 #include <io.h> /* lseek, open, close, read */
69 #define lseek _lseeki64
76 #ifdef __BIONIC__ /* Android */
84 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
89 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
94 GST_DEBUG_CATEGORY_STATIC (queue_debug);
95 #define GST_CAT_DEFAULT (queue_debug)
96 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
104 #define DEFAULT_BUFFER_SIZE 4096
105 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
106 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0) /* for consistency with the above macro */
107 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
109 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
111 /* default property values */
112 #define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */
113 #define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */
114 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */
115 #define DEFAULT_USE_BUFFERING FALSE
116 #define DEFAULT_USE_TAGS_BITRATE FALSE
117 #define DEFAULT_USE_RATE_ESTIMATE TRUE
118 #define DEFAULT_LOW_PERCENT 10
119 #define DEFAULT_HIGH_PERCENT 99
120 #define DEFAULT_TEMP_REMOVE TRUE
121 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
126 PROP_CUR_LEVEL_BUFFERS,
127 PROP_CUR_LEVEL_BYTES,
129 PROP_MAX_SIZE_BUFFERS,
133 PROP_USE_TAGS_BITRATE,
134 PROP_USE_RATE_ESTIMATE,
140 PROP_RING_BUFFER_MAX_SIZE,
145 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \
152 #define STATUS(queue, pad, msg) \
153 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
154 "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
155 "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
156 " ns, %"G_GUINT64_FORMAT" items", \
157 GST_DEBUG_PAD_NAME (pad), \
158 queue->cur_level.buffers, \
159 queue->max_level.buffers, \
160 queue->cur_level.bytes, \
161 queue->max_level.bytes, \
162 queue->cur_level.time, \
163 queue->max_level.time, \
164 (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
165 queue->current->writing_pos - queue->current->max_reading_pos : \
166 queue->queue.length))
168 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
169 g_mutex_lock (&q->qlock); \
172 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START { \
173 GST_QUEUE2_MUTEX_LOCK (q); \
174 if (res != GST_FLOW_OK) \
178 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START { \
179 g_mutex_unlock (&q->qlock); \
182 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START { \
183 STATUS (queue, q->sinkpad, "wait for DEL"); \
184 q->waiting_del = TRUE; \
185 g_cond_wait (&q->item_del, &queue->qlock); \
186 q->waiting_del = FALSE; \
187 if (res != GST_FLOW_OK) { \
188 STATUS (queue, q->srcpad, "received DEL wakeup"); \
191 STATUS (queue, q->sinkpad, "received DEL"); \
194 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START { \
195 STATUS (queue, q->srcpad, "wait for ADD"); \
196 q->waiting_add = TRUE; \
197 g_cond_wait (&q->item_add, &q->qlock); \
198 q->waiting_add = FALSE; \
199 if (res != GST_FLOW_OK) { \
200 STATUS (queue, q->srcpad, "received ADD wakeup"); \
203 STATUS (queue, q->srcpad, "received ADD"); \
206 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START { \
207 if (q->waiting_del) { \
208 STATUS (q, q->srcpad, "signal DEL"); \
209 g_cond_signal (&q->item_del); \
213 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START { \
214 if (q->waiting_add) { \
215 STATUS (q, q->sinkpad, "signal ADD"); \
216 g_cond_signal (&q->item_add); \
220 #define SET_PERCENT(q, perc) G_STMT_START { \
221 if (perc != q->buffering_percent) { \
222 q->buffering_percent = perc; \
223 q->percent_changed = TRUE; \
224 GST_DEBUG_OBJECT (q, "buffering %d percent", perc); \
225 get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out, \
226 &q->buffering_left); \
231 GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
232 GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
233 "dataflow inside the queue element");
234 #define gst_queue2_parent_class parent_class
235 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
237 static void gst_queue2_finalize (GObject * object);
239 static void gst_queue2_set_property (GObject * object,
240 guint prop_id, const GValue * value, GParamSpec * pspec);
241 static void gst_queue2_get_property (GObject * object,
242 guint prop_id, GValue * value, GParamSpec * pspec);
244 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
246 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
247 GstBufferList * buffer_list);
248 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
249 static void gst_queue2_loop (GstPad * pad);
251 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
253 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
256 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
258 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
260 static gboolean gst_queue2_handle_query (GstElement * element,
263 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
264 guint64 offset, guint length, GstBuffer ** buffer);
266 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
267 GstPadMode mode, gboolean active);
268 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
269 GstPadMode mode, gboolean active);
270 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
271 GstStateChange transition);
273 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
274 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
276 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
277 static void update_in_rates (GstQueue2 * queue, gboolean force);
278 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
279 static void gst_queue2_post_buffering (GstQueue2 * queue);
283 GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
284 GST_QUEUE2_ITEM_TYPE_BUFFER,
285 GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
286 GST_QUEUE2_ITEM_TYPE_EVENT,
287 GST_QUEUE2_ITEM_TYPE_QUERY
292 GstQueue2ItemType type;
296 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
299 gst_queue2_class_init (GstQueue2Class * klass)
301 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
302 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
304 gobject_class->set_property = gst_queue2_set_property;
305 gobject_class->get_property = gst_queue2_get_property;
308 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
309 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
310 "Current amount of data in the queue (bytes)",
311 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
312 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
313 g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
314 "Current number of buffers in the queue",
315 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
316 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
317 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
318 "Current amount of data in the queue (in ns)",
319 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
321 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
322 g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
323 "Max. amount of data in the queue (bytes, 0=disable)",
324 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
325 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
326 G_PARAM_STATIC_STRINGS));
327 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
328 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
329 "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
330 DEFAULT_MAX_SIZE_BUFFERS,
331 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
332 G_PARAM_STATIC_STRINGS));
333 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
334 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
335 "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
336 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
337 G_PARAM_STATIC_STRINGS));
339 g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
340 g_param_spec_boolean ("use-buffering", "Use buffering",
341 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
342 DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
343 G_PARAM_STATIC_STRINGS));
344 g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
345 g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
346 "Use a bitrate from upstream tags to estimate buffer duration if not provided",
347 DEFAULT_USE_TAGS_BITRATE,
348 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
349 G_PARAM_STATIC_STRINGS));
350 g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
351 g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
352 "Estimate the bitrate of the stream to calculate time level",
353 DEFAULT_USE_RATE_ESTIMATE,
354 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
356 g_param_spec_int ("low-percent", "Low percent",
357 "Low threshold for buffering to start. Only used if use-buffering is True",
358 0, 100, DEFAULT_LOW_PERCENT,
359 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
360 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
361 g_param_spec_int ("high-percent", "High percent",
362 "High threshold for buffering to finish. Only used if use-buffering is True",
363 0, 100, DEFAULT_HIGH_PERCENT,
364 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366 g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
367 g_param_spec_string ("temp-template", "Temporary File Template",
368 "File template to store temporary files in, should contain directory "
369 "and XXXXXX. (NULL == disabled)",
370 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
373 g_param_spec_string ("temp-location", "Temporary File Location",
374 "Location to store temporary files in (Only read this property, "
375 "use temp-template to configure the name template)",
376 NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
379 * GstQueue2:temp-remove
381 * When temp-template is set, remove the temporary file when going to READY.
383 g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
384 g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
385 "Remove the temp-location after use",
386 DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
389 * GstQueue2:ring-buffer-max-size
391 * The maximum size of the ring buffer in bytes. If set to 0, the ring
392 * buffer is disabled. Default 0.
394 g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
395 g_param_spec_uint64 ("ring-buffer-max-size",
396 "Max. ring buffer size (bytes)",
397 "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
398 0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
399 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
402 * GstQueue2:avg-in-rate
404 * The average input data rate.
406 g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
407 g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
408 "Average input data rate (bytes/s)",
409 0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
411 /* set several parent class virtual functions */
412 gobject_class->finalize = gst_queue2_finalize;
414 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
415 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
417 gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
420 "Erik Walthinsen <omega@cse.ogi.edu>, "
421 "Wim Taymans <wim.taymans@gmail.com>");
423 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
424 gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
428 gst_queue2_init (GstQueue2 * queue)
430 queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
432 gst_pad_set_chain_function (queue->sinkpad,
433 GST_DEBUG_FUNCPTR (gst_queue2_chain));
434 gst_pad_set_chain_list_function (queue->sinkpad,
435 GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
436 gst_pad_set_activatemode_function (queue->sinkpad,
437 GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
438 gst_pad_set_event_function (queue->sinkpad,
439 GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
440 gst_pad_set_query_function (queue->sinkpad,
441 GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
442 GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
443 gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
445 queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
447 gst_pad_set_activatemode_function (queue->srcpad,
448 GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
449 gst_pad_set_getrange_function (queue->srcpad,
450 GST_DEBUG_FUNCPTR (gst_queue2_get_range));
451 gst_pad_set_event_function (queue->srcpad,
452 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
453 gst_pad_set_query_function (queue->srcpad,
454 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
455 GST_PAD_SET_PROXY_CAPS (queue->srcpad);
456 gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
459 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
460 queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
461 queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
462 queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
463 queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
464 queue->use_buffering = DEFAULT_USE_BUFFERING;
465 queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
466 queue->low_percent = DEFAULT_LOW_PERCENT;
467 queue->high_percent = DEFAULT_HIGH_PERCENT;
469 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
470 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
472 queue->sinktime = GST_CLOCK_TIME_NONE;
473 queue->srctime = GST_CLOCK_TIME_NONE;
474 queue->sink_tainted = TRUE;
475 queue->src_tainted = TRUE;
477 queue->srcresult = GST_FLOW_FLUSHING;
478 queue->sinkresult = GST_FLOW_FLUSHING;
479 queue->is_eos = FALSE;
480 queue->in_timer = g_timer_new ();
481 queue->out_timer = g_timer_new ();
483 g_mutex_init (&queue->qlock);
484 queue->waiting_add = FALSE;
485 g_cond_init (&queue->item_add);
486 queue->waiting_del = FALSE;
487 g_cond_init (&queue->item_del);
488 g_queue_init (&queue->queue);
490 g_cond_init (&queue->query_handled);
491 queue->last_query = FALSE;
493 g_mutex_init (&queue->buffering_post_lock);
494 queue->buffering_percent = 100;
496 /* tempfile related */
497 queue->temp_template = NULL;
498 queue->temp_location = NULL;
499 queue->temp_remove = DEFAULT_TEMP_REMOVE;
501 queue->ring_buffer = NULL;
502 queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
504 GST_DEBUG_OBJECT (queue,
505 "initialized queue's not_empty & not_full conditions");
508 /* called only once, as opposed to dispose */
510 gst_queue2_finalize (GObject * object)
512 GstQueue2 *queue = GST_QUEUE2 (object);
514 GST_DEBUG_OBJECT (queue, "finalizing queue");
516 while (!g_queue_is_empty (&queue->queue)) {
517 GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
519 if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
520 gst_mini_object_unref (qitem->item);
521 g_slice_free (GstQueue2Item, qitem);
524 queue->last_query = FALSE;
525 g_queue_clear (&queue->queue);
526 g_mutex_clear (&queue->qlock);
527 g_mutex_clear (&queue->buffering_post_lock);
528 g_cond_clear (&queue->item_add);
529 g_cond_clear (&queue->item_del);
530 g_cond_clear (&queue->query_handled);
531 g_timer_destroy (queue->in_timer);
532 g_timer_destroy (queue->out_timer);
534 /* temp_file path cleanup */
535 g_free (queue->temp_template);
536 g_free (queue->temp_location);
538 G_OBJECT_CLASS (parent_class)->finalize (object);
542 debug_ranges (GstQueue2 * queue)
544 GstQueue2Range *walk;
546 for (walk = queue->ranges; walk; walk = walk->next) {
547 GST_DEBUG_OBJECT (queue,
548 "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
549 G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
550 " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
551 walk->rb_writing_pos, walk->reading_pos,
552 walk == queue->current ? "**y**" : " n ");
556 /* clear all the downloaded ranges */
558 clean_ranges (GstQueue2 * queue)
560 GST_DEBUG_OBJECT (queue, "clean queue ranges");
562 g_slice_free_chain (GstQueue2Range, queue->ranges, next);
563 queue->ranges = NULL;
564 queue->current = NULL;
567 /* find a range that contains @offset or NULL when nothing does */
568 static GstQueue2Range *
569 find_range (GstQueue2 * queue, guint64 offset)
571 GstQueue2Range *range = NULL;
572 GstQueue2Range *walk;
574 /* first do a quick check for the current range */
575 for (walk = queue->ranges; walk; walk = walk->next) {
576 if (offset >= walk->offset && offset <= walk->writing_pos) {
577 /* we can reuse an existing range */
583 GST_DEBUG_OBJECT (queue,
584 "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
585 G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
587 GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
593 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
595 guint64 max_reading_pos, writing_pos;
597 writing_pos = range->writing_pos;
598 max_reading_pos = range->max_reading_pos;
600 if (writing_pos > max_reading_pos)
601 queue->cur_level.bytes = writing_pos - max_reading_pos;
603 queue->cur_level.bytes = 0;
606 /* make a new range for @offset or reuse an existing range */
607 static GstQueue2Range *
608 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
610 GstQueue2Range *range, *prev, *next;
612 GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
614 if ((range = find_range (queue, offset))) {
615 GST_DEBUG_OBJECT (queue,
616 "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
618 if (update_existing && range->writing_pos != offset) {
619 GST_DEBUG_OBJECT (queue, "updating range writing position to "
620 "%" G_GUINT64_FORMAT, offset);
621 range->writing_pos = offset;
624 GST_DEBUG_OBJECT (queue,
625 "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
627 range = g_slice_new0 (GstQueue2Range);
628 range->offset = offset;
629 /* we want to write to the next location in the ring buffer */
630 range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
631 range->writing_pos = offset;
632 range->rb_writing_pos = range->rb_offset;
633 range->reading_pos = offset;
634 range->max_reading_pos = offset;
638 next = queue->ranges;
640 if (next->offset > offset) {
641 /* insert before next */
642 GST_DEBUG_OBJECT (queue,
643 "insert before range %p, offset %" G_GUINT64_FORMAT, next,
655 queue->ranges = range;
657 debug_ranges (queue);
659 /* update the stats for this range */
660 update_cur_level (queue, range);
666 /* clear and init the download ranges for offset 0 */
668 init_ranges (GstQueue2 * queue)
670 GST_DEBUG_OBJECT (queue, "init queue ranges");
672 /* get rid of all the current ranges */
673 clean_ranges (queue);
674 /* make a range for offset 0 */
675 queue->current = add_range (queue, 0, TRUE);
678 /* calculate the diff between running time on the sink and src of the queue.
679 * This is the total amount of time in the queue. */
681 update_time_level (GstQueue2 * queue)
683 if (queue->sink_tainted) {
685 gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
686 queue->sink_segment.position);
687 queue->sink_tainted = FALSE;
690 if (queue->src_tainted) {
692 gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
693 queue->src_segment.position);
694 queue->src_tainted = FALSE;
697 GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
698 GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
700 if (queue->sinktime != GST_CLOCK_TIME_NONE
701 && queue->srctime != GST_CLOCK_TIME_NONE
702 && queue->sinktime >= queue->srctime)
703 queue->cur_level.time = queue->sinktime - queue->srctime;
705 queue->cur_level.time = 0;
708 /* take a SEGMENT event and apply the values to segment, updating the time
711 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
714 gst_event_copy_segment (event, segment);
716 if (segment->format == GST_FORMAT_BYTES) {
717 if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
718 /* start is where we'll be getting from and as such writing next */
719 queue->current = add_range (queue, segment->start, TRUE);
723 /* now configure the values, we use these to track timestamps on the
725 if (segment->format != GST_FORMAT_TIME) {
726 /* non-time format, pretend the current time segment is closed with a
727 * 0 start and unknown stop time. */
728 segment->format = GST_FORMAT_TIME;
734 GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
737 queue->sink_tainted = TRUE;
739 queue->src_tainted = TRUE;
741 /* segment can update the time level of the queue */
742 update_time_level (queue);
746 apply_gap (GstQueue2 * queue, GstEvent * event,
747 GstSegment * segment, gboolean is_sink)
749 GstClockTime timestamp;
750 GstClockTime duration;
752 gst_event_parse_gap (event, ×tamp, &duration);
754 if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
756 if (GST_CLOCK_TIME_IS_VALID (duration)) {
757 timestamp += duration;
760 segment->position = timestamp;
763 queue->sink_tainted = TRUE;
765 queue->src_tainted = TRUE;
767 /* calc diff with other end */
768 update_time_level (queue);
772 /* take a buffer and update segment, updating the time level of the queue. */
774 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
775 guint64 size, gboolean is_sink)
777 GstClockTime duration, timestamp;
779 timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
780 duration = GST_BUFFER_DURATION (buffer);
782 /* If we have no duration, pick one from the bitrate if we can */
783 if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
785 is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
787 duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
790 /* if no timestamp is set, assume it's continuous with the previous
792 if (timestamp == GST_CLOCK_TIME_NONE)
793 timestamp = segment->position;
796 if (duration != GST_CLOCK_TIME_NONE)
797 timestamp += duration;
799 GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
800 GST_TIME_ARGS (timestamp));
802 segment->position = timestamp;
805 queue->sink_tainted = TRUE;
807 queue->src_tainted = TRUE;
809 /* calc diff with other end */
810 update_time_level (queue);
815 GstClockTime timestamp;
820 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
822 struct BufListData *bld = data;
823 GstClockTime *timestamp = &bld->timestamp;
826 GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
827 " duration %" GST_TIME_FORMAT, idx,
828 GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
829 GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
830 GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
832 btime = GST_BUFFER_DTS_OR_PTS (*buf);
833 if (GST_CLOCK_TIME_IS_VALID (btime))
836 if (GST_BUFFER_DURATION_IS_VALID (*buf))
837 *timestamp += GST_BUFFER_DURATION (*buf);
838 else if (bld->bitrate != 0) {
839 guint64 size = gst_buffer_get_size (*buf);
841 /* If we have no duration, pick one from the bitrate if we can */
842 *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
846 GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
850 /* take a buffer list and update segment, updating the time level of the queue */
852 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
853 GstSegment * segment, gboolean is_sink)
855 struct BufListData bld;
857 /* if no timestamp is set, assume it's continuous with the previous time */
858 bld.timestamp = segment->position;
860 if (queue->use_tags_bitrate) {
862 bld.bitrate = queue->sink_tags_bitrate;
864 bld.bitrate = queue->src_tags_bitrate;
868 gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
870 GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
871 GST_TIME_ARGS (bld.timestamp));
873 segment->position = bld.timestamp;
876 queue->sink_tainted = TRUE;
878 queue->src_tainted = TRUE;
880 /* calc diff with other end */
881 update_time_level (queue);
885 get_percent (guint64 cur_level, guint64 max_level, guint64 alt_max)
893 p = gst_util_uint64_scale (cur_level, 100, MIN (max_level, alt_max));
895 p = gst_util_uint64_scale (cur_level, 100, max_level);
901 get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
906 if (queue->high_percent <= 0) {
910 *is_buffering = FALSE;
913 #define GET_PERCENT(format,alt_max) \
914 get_percent(queue->cur_level.format,queue->max_level.format,(alt_max))
917 /* on EOS we are always 100% full, we set the var here so that it we can
918 * reuse the logic below to stop buffering */
920 GST_LOG_OBJECT (queue, "we are EOS");
922 GST_LOG_OBJECT (queue,
923 "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
924 queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
925 queue->cur_level.buffers);
927 /* figure out the percent we are filled, we take the max of all formats. */
928 if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
929 perc = GET_PERCENT (bytes, 0);
931 guint64 rb_size = queue->ring_buffer_max_size;
932 perc = GET_PERCENT (bytes, rb_size);
935 perc2 = GET_PERCENT (time, 0);
936 perc = MAX (perc, perc2);
938 perc2 = GET_PERCENT (buffers, 0);
939 perc = MAX (perc, perc2);
941 /* also apply the rate estimate when we need to */
942 if (queue->use_rate_estimate) {
943 perc2 = GET_PERCENT (rate_time, 0);
944 perc = MAX (perc, perc2);
947 /* Don't get to 0% unless we're really empty */
948 if (queue->cur_level.bytes > 0)
949 perc = MAX (1, perc);
954 *is_buffering = queue->is_buffering;
956 /* scale to high percent so that it becomes the 100% mark */
957 perc = perc * 100 / queue->high_percent;
965 GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
972 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
973 gint * avg_in, gint * avg_out, gint64 * buffering_left)
976 if (!QUEUE_IS_USING_QUEUE (queue)) {
977 if (QUEUE_IS_USING_RING_BUFFER (queue))
978 *mode = GST_BUFFERING_TIMESHIFT;
980 *mode = GST_BUFFERING_DOWNLOAD;
982 *mode = GST_BUFFERING_STREAM;
987 *avg_in = queue->byte_in_rate;
989 *avg_out = queue->byte_out_rate;
991 if (buffering_left) {
992 *buffering_left = (percent == 100 ? 0 : -1);
994 if (queue->use_rate_estimate) {
997 max = queue->max_level.rate_time;
998 cur = queue->cur_level.rate_time;
1000 if (percent != 100 && max > cur)
1001 *buffering_left = (max - cur) / 1000000;
1006 /* Called with the lock taken */
1008 gst_queue2_get_buffering_message (GstQueue2 * queue)
1010 GstMessage *msg = NULL;
1012 if (queue->percent_changed) {
1013 gint percent = queue->buffering_percent;
1015 queue->percent_changed = FALSE;
1017 GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
1018 msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
1020 gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1021 queue->avg_out, queue->buffering_left);
1028 gst_queue2_post_buffering (GstQueue2 * queue)
1030 GstMessage *msg = NULL;
1032 g_mutex_lock (&queue->buffering_post_lock);
1033 GST_QUEUE2_MUTEX_LOCK (queue);
1034 msg = gst_queue2_get_buffering_message (queue);
1035 GST_QUEUE2_MUTEX_UNLOCK (queue);
1038 gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
1040 g_mutex_unlock (&queue->buffering_post_lock);
1044 update_buffering (GstQueue2 * queue)
1048 /* Ensure the variables used to calculate buffering state are up-to-date. */
1050 update_cur_level (queue, queue->current);
1051 update_in_rates (queue, FALSE);
1053 if (!get_buffering_percent (queue, NULL, &percent))
1056 if (queue->is_buffering) {
1057 /* if we were buffering see if we reached the high watermark */
1059 queue->is_buffering = FALSE;
1061 SET_PERCENT (queue, percent);
1063 /* we were not buffering, check if we need to start buffering if we drop
1064 * below the low threshold */
1065 if (percent < queue->low_percent) {
1066 queue->is_buffering = TRUE;
1067 SET_PERCENT (queue, percent);
1073 reset_rate_timer (GstQueue2 * queue)
1075 queue->bytes_in = 0;
1076 queue->bytes_out = 0;
1077 queue->byte_in_rate = 0.0;
1078 queue->byte_in_period = 0;
1079 queue->byte_out_rate = 0.0;
1080 queue->last_update_in_rates_elapsed = 0.0;
1081 queue->last_in_elapsed = 0.0;
1082 queue->last_out_elapsed = 0.0;
1083 queue->in_timer_started = FALSE;
1084 queue->out_timer_started = FALSE;
1087 /* the interval in seconds to recalculate the rate */
1088 #define RATE_INTERVAL 0.2
1089 /* Tuning for rate estimation. We use a large window for the input rate because
1090 * it should be stable when connected to a network. The output rate is less
1091 * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1092 * therefore adapt more quickly.
1093 * However, initial input rate may be subject to a burst, and should therefore
1094 * initially also adapt more quickly to changes, and only later on give higher
1095 * weight to previous values. */
1096 #define AVG_IN(avg,val,w1,w2) ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1097 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1100 update_in_rates (GstQueue2 * queue, gboolean force)
1102 gdouble elapsed, period;
1103 gdouble byte_in_rate;
1105 if (!queue->in_timer_started) {
1106 queue->in_timer_started = TRUE;
1107 g_timer_start (queue->in_timer);
1111 queue->last_update_in_rates_elapsed = elapsed =
1112 g_timer_elapsed (queue->in_timer, NULL);
1114 /* recalc after each interval. */
1115 if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1116 period = elapsed - queue->last_in_elapsed;
1118 GST_DEBUG_OBJECT (queue,
1119 "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1120 period, queue->bytes_in, queue->byte_in_period);
1122 byte_in_rate = queue->bytes_in / period;
1124 if (queue->byte_in_rate == 0.0)
1125 queue->byte_in_rate = byte_in_rate;
1127 queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1128 (double) queue->byte_in_period, period);
1130 /* another data point, cap at 16 for long time running average */
1131 if (queue->byte_in_period < 16 * RATE_INTERVAL)
1132 queue->byte_in_period += period;
1134 /* reset the values to calculate rate over the next interval */
1135 queue->last_in_elapsed = elapsed;
1136 queue->bytes_in = 0;
1139 if (queue->byte_in_rate > 0.0) {
1140 queue->cur_level.rate_time =
1141 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1143 GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1144 queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1148 update_out_rates (GstQueue2 * queue)
1150 gdouble elapsed, period;
1151 gdouble byte_out_rate;
1153 if (!queue->out_timer_started) {
1154 queue->out_timer_started = TRUE;
1155 g_timer_start (queue->out_timer);
1159 elapsed = g_timer_elapsed (queue->out_timer, NULL);
1161 /* recalc after each interval. */
1162 if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1163 period = elapsed - queue->last_out_elapsed;
1165 GST_DEBUG_OBJECT (queue,
1166 "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1168 byte_out_rate = queue->bytes_out / period;
1170 if (queue->byte_out_rate == 0.0)
1171 queue->byte_out_rate = byte_out_rate;
1173 queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1175 /* reset the values to calculate rate over the next interval */
1176 queue->last_out_elapsed = elapsed;
1177 queue->bytes_out = 0;
1179 if (queue->byte_in_rate > 0.0) {
1180 queue->cur_level.rate_time =
1181 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1183 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1184 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1188 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1190 guint64 reading_pos, max_reading_pos;
1193 max_reading_pos = range->max_reading_pos;
1195 max_reading_pos = MAX (max_reading_pos, reading_pos);
1197 GST_DEBUG_OBJECT (queue,
1198 "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1199 G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1200 range->max_reading_pos = max_reading_pos;
1202 update_cur_level (queue, range);
1206 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1211 /* until we receive the FLUSH_STOP from this seek, we skip data */
1212 queue->seeking = TRUE;
1213 GST_QUEUE2_MUTEX_UNLOCK (queue);
1215 debug_ranges (queue);
1217 GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1220 gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1221 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1222 GST_SEEK_TYPE_NONE, -1);
1224 res = gst_pad_push_event (queue->sinkpad, event);
1225 GST_QUEUE2_MUTEX_LOCK (queue);
1228 /* Between us sending the seek event and re-acquiring the lock, the source
1229 * thread might already have pushed data and moved along the range's
1230 * writing_pos beyond the seek offset. In that case we don't want to set
1231 * the writing position back to the requested seek position, as it would
1232 * cause data to be written to the wrong offset in the file or ring buffer.
1233 * We still do the add_range call to switch the current range to the
1234 * requested range, or create one if one doesn't exist yet. */
1235 queue->current = add_range (queue, offset, FALSE);
1241 /* get the threshold for when we decide to seek rather than wait */
1243 get_seek_threshold (GstQueue2 * queue)
1247 /* FIXME, find a good threshold based on the incoming rate. */
1248 threshold = 1024 * 512;
1250 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1251 threshold = MIN (threshold,
1252 QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1257 /* see if there is enough data in the file to read a full buffer */
1259 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1261 GstQueue2Range *range;
1263 GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1266 if ((range = find_range (queue, offset))) {
1267 if (queue->current != range) {
1268 GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1269 perform_seek_to_offset (queue, range->writing_pos);
1272 GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1273 queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1275 /* we have a range for offset */
1276 GST_DEBUG_OBJECT (queue,
1277 "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1278 G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1280 if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1283 if (offset + length <= range->writing_pos)
1286 GST_DEBUG_OBJECT (queue,
1287 "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1288 (offset + length) - range->writing_pos);
1291 GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1292 " len %u", offset, length);
1293 /* we don't have the range, see how far away we are */
1294 if (!queue->is_eos && queue->current) {
1295 guint64 threshold = get_seek_threshold (queue);
1297 if (offset >= queue->current->offset && offset <=
1298 queue->current->writing_pos + threshold) {
1299 GST_INFO_OBJECT (queue,
1300 "requested data is within range, wait for data");
1305 /* too far away, do a seek */
1306 perform_seek_to_offset (queue, offset);
1313 #define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1314 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1315 #define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1317 #define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0)
1320 static GstFlowReturn
1321 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1322 guint8 * dst, gint64 * read_return)
1324 guint8 *ring_buffer;
1327 ring_buffer = queue->ring_buffer;
1329 if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1332 /* this should not block */
1333 GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1335 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1336 res = fread (dst, 1, length, queue->temp_file);
1338 memcpy (dst, ring_buffer + offset, length);
1342 GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1344 if (G_UNLIKELY (res < length)) {
1345 if (!QUEUE_IS_USING_TEMP_FILE (queue))
1346 goto could_not_read;
1347 /* check for errors or EOF */
1348 if (ferror (queue->temp_file))
1349 goto could_not_read;
1350 if (feof (queue->temp_file) && length > 0)
1360 GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1361 return GST_FLOW_ERROR;
1365 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1366 return GST_FLOW_ERROR;
1370 GST_DEBUG ("non-regular file hits EOS");
1371 return GST_FLOW_EOS;
1375 static GstFlowReturn
1376 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1377 GstBuffer ** buffer)
1382 guint64 file_offset;
1383 guint block_length, remaining, read_length;
1387 GstFlowReturn ret = GST_FLOW_OK;
1389 /* allocate the output buffer of the requested size */
1390 if (*buffer == NULL)
1391 buf = gst_buffer_new_allocate (NULL, length, NULL);
1395 if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1396 goto buffer_write_fail;
1399 GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1403 rb_size = queue->ring_buffer_max_size;
1404 max_size = QUEUE_MAX_BYTES (queue);
1407 while (remaining > 0) {
1408 /* configure how much/whether to read */
1409 if (!gst_queue2_have_data (queue, rpos, remaining)) {
1412 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1415 /* calculate how far away the offset is */
1416 if (queue->current->writing_pos > rpos)
1417 level = queue->current->writing_pos - rpos;
1421 GST_DEBUG_OBJECT (queue,
1422 "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1423 ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1424 rpos, queue->current->writing_pos, level, max_size);
1426 if (level >= max_size) {
1427 /* we don't have the data but if we have a ring buffer that is full, we
1429 GST_DEBUG_OBJECT (queue,
1430 "ring buffer full, reading QUEUE_MAX_BYTES %"
1431 G_GUINT64_FORMAT " bytes", max_size);
1432 read_length = max_size;
1433 } else if (queue->is_eos) {
1434 /* won't get any more data so read any data we have */
1436 GST_DEBUG_OBJECT (queue,
1437 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1439 read_length = level;
1447 if (read_length == 0) {
1448 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1449 GST_DEBUG_OBJECT (queue,
1450 "update current position [%" G_GUINT64_FORMAT "-%"
1451 G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1452 update_cur_pos (queue, queue->current, rpos);
1453 GST_QUEUE2_SIGNAL_DEL (queue);
1456 if (queue->use_buffering)
1457 update_buffering (queue);
1459 GST_DEBUG_OBJECT (queue, "waiting for add");
1460 GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1464 /* we have the requested data so read it */
1465 read_length = remaining;
1468 /* set range reading_pos to actual reading position for this read */
1469 queue->current->reading_pos = rpos;
1471 /* configure how much and from where to read */
1472 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1474 (queue->current->rb_offset + (rpos -
1475 queue->current->offset)) % rb_size;
1476 if (file_offset + read_length > rb_size) {
1477 block_length = rb_size - file_offset;
1479 block_length = read_length;
1483 block_length = read_length;
1486 /* while we still have data to read, we loop */
1487 while (read_length > 0) {
1491 gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1492 data, &read_return);
1493 if (ret != GST_FLOW_OK)
1496 file_offset += read_return;
1497 if (QUEUE_IS_USING_RING_BUFFER (queue))
1498 file_offset %= rb_size;
1500 data += read_return;
1501 read_length -= read_return;
1502 block_length = read_length;
1503 remaining -= read_return;
1505 rpos = (queue->current->reading_pos += read_return);
1506 update_cur_pos (queue, queue->current, queue->current->reading_pos);
1508 GST_QUEUE2_SIGNAL_DEL (queue);
1509 GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1512 gst_buffer_unmap (buf, &info);
1513 gst_buffer_resize (buf, 0, length);
1515 GST_BUFFER_OFFSET (buf) = offset;
1516 GST_BUFFER_OFFSET_END (buf) = offset + length;
1525 GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1526 gst_buffer_unmap (buf, &info);
1527 if (*buffer == NULL)
1528 gst_buffer_unref (buf);
1529 return GST_FLOW_EOS;
1533 GST_DEBUG_OBJECT (queue, "we are flushing");
1534 gst_buffer_unmap (buf, &info);
1535 if (*buffer == NULL)
1536 gst_buffer_unref (buf);
1537 return GST_FLOW_FLUSHING;
1541 GST_DEBUG_OBJECT (queue, "we have a read error");
1542 gst_buffer_unmap (buf, &info);
1543 if (*buffer == NULL)
1544 gst_buffer_unref (buf);
1549 GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1550 ("Can't write to buffer"));
1551 if (*buffer == NULL)
1552 gst_buffer_unref (buf);
1553 return GST_FLOW_ERROR;
1557 /* should be called with QUEUE_LOCK */
1558 static GstMiniObject *
1559 gst_queue2_read_item_from_file (GstQueue2 * queue)
1561 GstMiniObject *item;
1563 if (queue->stream_start_event != NULL) {
1564 item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1565 queue->stream_start_event = NULL;
1566 } else if (queue->starting_segment != NULL) {
1567 item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1568 queue->starting_segment = NULL;
1571 GstBuffer *buffer = NULL;
1572 guint64 reading_pos;
1574 reading_pos = queue->current->reading_pos;
1577 gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1582 item = GST_MINI_OBJECT_CAST (buffer);
1585 item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1595 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1596 * the temp filename. */
1598 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1603 if (queue->temp_file)
1604 goto already_opened;
1606 GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1608 /* If temp_template was set, allocate a filename and open that file */
1611 if (queue->temp_template == NULL)
1614 /* make copy of the template, we don't want to change this */
1615 name = g_strdup (queue->temp_template);
1618 fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1620 fd = g_mkstemp (name);
1624 goto mkstemp_failed;
1626 /* open the file for update/writing */
1627 queue->temp_file = fdopen (fd, "wb+");
1628 /* error creating file */
1629 if (queue->temp_file == NULL)
1632 g_free (queue->temp_location);
1633 queue->temp_location = name;
1635 GST_QUEUE2_MUTEX_UNLOCK (queue);
1637 /* we can't emit the notify with the lock */
1638 g_object_notify (G_OBJECT (queue), "temp-location");
1640 GST_QUEUE2_MUTEX_LOCK (queue);
1642 GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1649 GST_DEBUG_OBJECT (queue, "temp file was already open");
1654 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1655 (_("No Temp directory specified.")), (NULL));
1660 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1661 (_("Could not create temp file \"%s\"."), queue->temp_template),
1668 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1669 (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1678 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1681 if (queue->temp_file == NULL)
1684 GST_DEBUG_OBJECT (queue, "closing temp file");
1686 fflush (queue->temp_file);
1687 fclose (queue->temp_file);
1689 if (queue->temp_remove) {
1690 if (remove (queue->temp_location) < 0) {
1691 GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1692 queue->temp_location, g_strerror (errno));
1696 queue->temp_file = NULL;
1697 clean_ranges (queue);
1701 gst_queue2_flush_temp_file (GstQueue2 * queue)
1703 if (queue->temp_file == NULL)
1706 GST_DEBUG_OBJECT (queue, "flushing temp file");
1708 queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1712 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1714 if (!QUEUE_IS_USING_QUEUE (queue)) {
1715 if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1716 gst_queue2_flush_temp_file (queue);
1717 init_ranges (queue);
1719 while (!g_queue_is_empty (&queue->queue)) {
1720 GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
1722 if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1723 && GST_EVENT_IS_STICKY (qitem->item)
1724 && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1725 && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1726 gst_pad_store_sticky_event (queue->srcpad,
1727 GST_EVENT_CAST (qitem->item));
1730 /* Then lose another reference because we are supposed to destroy that
1731 data when flushing */
1732 if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1733 gst_mini_object_unref (qitem->item);
1734 g_slice_free (GstQueue2Item, qitem);
1737 queue->last_query = FALSE;
1738 g_cond_signal (&queue->query_handled);
1739 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1740 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1741 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1742 queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1743 queue->sink_tainted = queue->src_tainted = TRUE;
1744 if (queue->starting_segment != NULL)
1745 gst_event_unref (queue->starting_segment);
1746 queue->starting_segment = NULL;
1747 queue->segment_event_received = FALSE;
1748 gst_event_replace (&queue->stream_start_event, NULL);
1750 /* we deleted a lot of something */
1751 GST_QUEUE2_SIGNAL_DEL (queue);
1755 gst_queue2_wait_free_space (GstQueue2 * queue)
1757 /* We make space available if we're "full" according to whatever
1758 * the user defined as "full". */
1759 if (gst_queue2_is_filled (queue)) {
1762 /* pause the timer while we wait. The fact that we are waiting does not mean
1763 * the byterate on the input pad is lower */
1764 if ((started = queue->in_timer_started))
1765 g_timer_stop (queue->in_timer);
1767 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1768 "queue is full, waiting for free space");
1770 /* Wait for space to be available, we could be unlocked because of a flush. */
1771 GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1773 while (gst_queue2_is_filled (queue));
1775 /* and continue if we were running before */
1777 g_timer_continue (queue->in_timer);
1784 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1790 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1793 guint8 *data, *ring_buffer;
1794 guint size, rb_size;
1795 guint64 writing_pos, new_writing_pos;
1796 GstQueue2Range *range, *prev, *next;
1797 gboolean do_seek = FALSE;
1799 if (QUEUE_IS_USING_RING_BUFFER (queue))
1800 writing_pos = queue->current->rb_writing_pos;
1802 writing_pos = queue->current->writing_pos;
1803 ring_buffer = queue->ring_buffer;
1804 rb_size = queue->ring_buffer_max_size;
1806 if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
1807 goto buffer_read_error;
1812 GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1816 if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1817 GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1818 GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1819 "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1820 GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1826 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1829 /* calculate the space in the ring buffer not used by data from
1830 * the current range */
1831 while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1832 /* wait until there is some free space */
1833 GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1835 /* get the amount of space we have */
1836 space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1838 /* calculate if we need to split or if we can write the entire
1840 to_write = MIN (size, space);
1842 /* the writing position in the ring buffer after writing (part
1843 * or all of) the buffer */
1844 new_writing_pos = (writing_pos + to_write) % rb_size;
1847 range = queue->ranges;
1849 /* if we need to overwrite data in the ring buffer, we need to
1852 * warning: this code is complicated and includes some
1853 * simplifications - pen, paper and diagrams for the cases
1856 guint64 range_data_start, range_data_end;
1857 GstQueue2Range *range_to_destroy = NULL;
1859 if (range == queue->current)
1862 range_data_start = range->rb_offset;
1863 range_data_end = range->rb_writing_pos;
1865 /* handle the special case where the range has no data in it */
1866 if (range->writing_pos == range->offset) {
1867 if (range != queue->current) {
1868 GST_DEBUG_OBJECT (queue,
1869 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1870 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1872 range_to_destroy = range;
1874 prev->next = range->next;
1879 if (range_data_end > range_data_start) {
1880 if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1883 if (new_writing_pos > range_data_start) {
1884 if (new_writing_pos >= range_data_end) {
1885 GST_DEBUG_OBJECT (queue,
1886 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1887 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1889 range_to_destroy = range;
1891 prev->next = range->next;
1893 GST_DEBUG_OBJECT (queue,
1894 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1895 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1896 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1897 range->offset + new_writing_pos - range_data_start,
1899 range->offset += (new_writing_pos - range_data_start);
1900 range->rb_offset = new_writing_pos;
1904 guint64 new_wpos_virt = writing_pos + to_write;
1906 if (new_wpos_virt <= range_data_start)
1909 if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1910 GST_DEBUG_OBJECT (queue,
1911 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1912 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1914 range_to_destroy = range;
1916 prev->next = range->next;
1918 GST_DEBUG_OBJECT (queue,
1919 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1920 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1921 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1922 range->offset + new_writing_pos - range_data_start,
1924 range->offset += (new_wpos_virt - range_data_start);
1925 range->rb_offset = new_writing_pos;
1930 if (!range_to_destroy)
1933 range = range->next;
1934 if (range_to_destroy) {
1935 if (range_to_destroy == queue->ranges)
1936 queue->ranges = range;
1937 g_slice_free (GstQueue2Range, range_to_destroy);
1938 range_to_destroy = NULL;
1943 new_writing_pos = writing_pos + to_write;
1946 if (QUEUE_IS_USING_TEMP_FILE (queue)
1947 && FSEEK_FILE (queue->temp_file, writing_pos))
1950 if (new_writing_pos > writing_pos) {
1951 GST_INFO_OBJECT (queue,
1952 "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1953 "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1954 queue->current->writing_pos, queue->current->rb_writing_pos);
1955 /* either not using ring buffer or no wrapping, just write */
1956 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1957 if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1960 memcpy (ring_buffer + writing_pos, data, to_write);
1963 if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1964 /* try to merge with next range */
1965 while ((next = queue->current->next)) {
1966 GST_INFO_OBJECT (queue,
1967 "checking merge with next range %" G_GUINT64_FORMAT " < %"
1968 G_GUINT64_FORMAT, new_writing_pos, next->offset);
1969 if (new_writing_pos < next->offset)
1972 GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1975 /* remove the group */
1976 queue->current->next = next->next;
1978 /* We use the threshold to decide if we want to do a seek or simply
1979 * read the data again. If there is not so much data in the range we
1980 * prefer to avoid to seek and read it again. */
1981 if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
1982 /* the new range had more data than the threshold, it's worth keeping
1983 * it and doing a seek. */
1984 new_writing_pos = next->writing_pos;
1987 g_slice_free (GstQueue2Range, next);
1989 goto update_and_signal;
1993 guint block_one, block_two;
1995 block_one = rb_size - writing_pos;
1996 block_two = to_write - block_one;
1998 if (block_one > 0) {
1999 GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2000 /* write data to end of ring buffer */
2001 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2002 if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2005 memcpy (ring_buffer + writing_pos, data, block_one);
2009 if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2012 if (block_two > 0) {
2013 GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2014 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2015 if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2018 memcpy (ring_buffer, data + block_one, block_two);
2024 /* update the writing positions */
2026 GST_INFO_OBJECT (queue,
2027 "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2028 to_write, writing_pos, size);
2030 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2032 queue->current->writing_pos += to_write;
2033 queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2035 queue->current->writing_pos = writing_pos = new_writing_pos;
2038 perform_seek_to_offset (queue, new_writing_pos);
2040 update_cur_level (queue, queue->current);
2042 /* update the buffering status */
2043 if (queue->use_buffering) {
2045 update_buffering (queue);
2046 msg = gst_queue2_get_buffering_message (queue);
2048 GST_QUEUE2_MUTEX_UNLOCK (queue);
2049 g_mutex_lock (&queue->buffering_post_lock);
2050 gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2051 g_mutex_unlock (&queue->buffering_post_lock);
2052 GST_QUEUE2_MUTEX_LOCK (queue);
2056 GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2057 queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2059 GST_QUEUE2_SIGNAL_ADD (queue);
2062 gst_buffer_unmap (buffer, &info);
2069 GST_DEBUG_OBJECT (queue, "we are flushing");
2070 gst_buffer_unmap (buffer, &info);
2071 /* FIXME - GST_FLOW_EOS ? */
2076 GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2077 gst_buffer_unmap (buffer, &info);
2084 GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2088 GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2089 (_("Error while writing to download file.")),
2090 ("%s", g_strerror (errno)));
2093 gst_buffer_unmap (buffer, &info);
2098 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2099 ("Can't read from buffer"));
2105 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2107 GstQueue2 *queue = q;
2109 GST_TRACE_OBJECT (queue,
2110 "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2111 gst_buffer_get_size (*buf));
2113 if (!gst_queue2_create_write (queue, *buf)) {
2114 GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2121 buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
2123 guint *p_size = data;
2126 buf_size = gst_buffer_get_size (*buf);
2127 GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
2128 *p_size += buf_size;
2132 /* enqueue an item an update the level stats */
2134 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2135 GstQueue2ItemType item_type)
2137 if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2141 buffer = GST_BUFFER_CAST (item);
2142 size = gst_buffer_get_size (buffer);
2144 /* add buffer to the statistics */
2145 if (QUEUE_IS_USING_QUEUE (queue)) {
2146 queue->cur_level.buffers++;
2147 queue->cur_level.bytes += size;
2149 queue->bytes_in += size;
2151 /* apply new buffer to segment stats */
2152 apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2153 /* update the byterate stats */
2154 update_in_rates (queue, FALSE);
2156 if (!QUEUE_IS_USING_QUEUE (queue)) {
2157 /* FIXME - check return value? */
2158 gst_queue2_create_write (queue, buffer);
2160 } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2161 GstBufferList *buffer_list;
2164 buffer_list = GST_BUFFER_LIST_CAST (item);
2166 gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2167 GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2169 /* add buffer to the statistics */
2170 if (QUEUE_IS_USING_QUEUE (queue)) {
2171 queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2172 queue->cur_level.bytes += size;
2174 queue->bytes_in += size;
2176 /* apply new buffer to segment stats */
2177 apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2179 /* update the byterate stats */
2180 update_in_rates (queue, FALSE);
2182 if (!QUEUE_IS_USING_QUEUE (queue)) {
2183 gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2185 } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2188 event = GST_EVENT_CAST (item);
2190 switch (GST_EVENT_TYPE (event)) {
2192 /* Zero the thresholds, this makes sure the queue is completely
2193 * filled and we can read all data from the queue. */
2194 GST_DEBUG_OBJECT (queue, "we have EOS");
2195 queue->is_eos = TRUE;
2196 /* Force updating the input bitrate */
2197 update_in_rates (queue, TRUE);
2199 case GST_EVENT_SEGMENT:
2200 apply_segment (queue, event, &queue->sink_segment, TRUE);
2201 /* This is our first new segment, we hold it
2202 * as we can't save it on the temp file */
2203 if (!QUEUE_IS_USING_QUEUE (queue)) {
2204 if (queue->segment_event_received)
2205 goto unexpected_event;
2207 queue->segment_event_received = TRUE;
2208 if (queue->starting_segment != NULL)
2209 gst_event_unref (queue->starting_segment);
2210 queue->starting_segment = event;
2213 /* a new segment allows us to accept more buffers if we got EOS
2214 * from downstream */
2215 queue->unexpected = FALSE;
2218 apply_gap (queue, event, &queue->sink_segment, TRUE);
2220 case GST_EVENT_STREAM_START:
2221 if (!QUEUE_IS_USING_QUEUE (queue)) {
2222 gst_event_replace (&queue->stream_start_event, event);
2223 gst_event_unref (event);
2227 case GST_EVENT_CAPS:{
2230 gst_event_parse_caps (event, &caps);
2231 GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2233 if (!QUEUE_IS_USING_QUEUE (queue)) {
2234 GST_LOG ("Dropping caps event, not using queue");
2235 gst_event_unref (event);
2241 if (!QUEUE_IS_USING_QUEUE (queue))
2242 goto unexpected_event;
2245 } else if (GST_IS_QUERY (item)) {
2246 /* Can't happen as we check that in the caller */
2247 if (!QUEUE_IS_USING_QUEUE (queue))
2248 g_assert_not_reached ();
2250 g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2251 item, GST_OBJECT_NAME (queue));
2252 /* we can't really unref since we don't know what it is */
2257 /* update the buffering status */
2258 if (queue->use_buffering)
2259 update_buffering (queue);
2261 if (QUEUE_IS_USING_QUEUE (queue)) {
2262 GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
2263 qitem->type = item_type;
2265 g_queue_push_tail (&queue->queue, qitem);
2267 gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2270 GST_QUEUE2_SIGNAL_ADD (queue);
2278 gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2280 GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2281 "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2282 GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2283 gst_event_unref (GST_EVENT_CAST (item));
2288 /* dequeue an item from the queue and update level stats */
2289 static GstMiniObject *
2290 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2292 GstMiniObject *item;
2294 if (!QUEUE_IS_USING_QUEUE (queue)) {
2295 item = gst_queue2_read_item_from_file (queue);
2297 GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
2303 g_slice_free (GstQueue2Item, qitem);
2309 if (GST_IS_BUFFER (item)) {
2313 buffer = GST_BUFFER_CAST (item);
2314 size = gst_buffer_get_size (buffer);
2315 *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2317 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2318 "retrieved buffer %p from queue", buffer);
2320 if (QUEUE_IS_USING_QUEUE (queue)) {
2321 queue->cur_level.buffers--;
2322 queue->cur_level.bytes -= size;
2324 queue->bytes_out += size;
2326 apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2327 /* update the byterate stats */
2328 update_out_rates (queue);
2329 /* update the buffering */
2330 if (queue->use_buffering)
2331 update_buffering (queue);
2333 } else if (GST_IS_EVENT (item)) {
2334 GstEvent *event = GST_EVENT_CAST (item);
2336 *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2338 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2339 "retrieved event %p from queue", event);
2341 switch (GST_EVENT_TYPE (event)) {
2343 /* queue is empty now that we dequeued the EOS */
2344 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2346 case GST_EVENT_SEGMENT:
2347 apply_segment (queue, event, &queue->src_segment, FALSE);
2350 apply_gap (queue, event, &queue->src_segment, FALSE);
2355 } else if (GST_IS_BUFFER_LIST (item)) {
2356 GstBufferList *buffer_list;
2359 buffer_list = GST_BUFFER_LIST_CAST (item);
2360 gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
2361 *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2363 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2364 "retrieved buffer list %p from queue", buffer_list);
2366 if (QUEUE_IS_USING_QUEUE (queue)) {
2367 queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2368 queue->cur_level.bytes -= size;
2370 queue->bytes_out += size;
2372 apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2373 /* update the byterate stats */
2374 update_out_rates (queue);
2375 /* update the buffering */
2376 if (queue->use_buffering)
2377 update_buffering (queue);
2378 } else if (GST_IS_QUERY (item)) {
2379 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2380 "retrieved query %p from queue", item);
2381 *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2384 ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2385 item, GST_OBJECT_NAME (queue));
2387 *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2389 GST_QUEUE2_SIGNAL_DEL (queue);
2396 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2402 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2405 gboolean ret = TRUE;
2408 queue = GST_QUEUE2 (parent);
2410 switch (GST_EVENT_TYPE (event)) {
2411 case GST_EVENT_FLUSH_START:
2413 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
2414 if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2416 ret = gst_pad_push_event (queue->srcpad, event);
2418 /* now unblock the chain function */
2419 GST_QUEUE2_MUTEX_LOCK (queue);
2420 queue->srcresult = GST_FLOW_FLUSHING;
2421 queue->sinkresult = GST_FLOW_FLUSHING;
2422 /* unblock the loop and chain functions */
2423 GST_QUEUE2_SIGNAL_ADD (queue);
2424 GST_QUEUE2_SIGNAL_DEL (queue);
2425 GST_QUEUE2_MUTEX_UNLOCK (queue);
2427 /* make sure it pauses, this should happen since we sent
2428 * flush_start downstream. */
2429 gst_pad_pause_task (queue->srcpad);
2430 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2432 GST_QUEUE2_MUTEX_LOCK (queue);
2433 queue->last_query = FALSE;
2434 g_cond_signal (&queue->query_handled);
2435 GST_QUEUE2_MUTEX_UNLOCK (queue);
2437 GST_QUEUE2_MUTEX_LOCK (queue);
2438 /* flush the sink pad */
2439 queue->sinkresult = GST_FLOW_FLUSHING;
2440 GST_QUEUE2_SIGNAL_DEL (queue);
2441 queue->last_query = FALSE;
2442 g_cond_signal (&queue->query_handled);
2443 GST_QUEUE2_MUTEX_UNLOCK (queue);
2445 gst_event_unref (event);
2449 case GST_EVENT_FLUSH_STOP:
2451 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
2453 if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2455 ret = gst_pad_push_event (queue->srcpad, event);
2457 GST_QUEUE2_MUTEX_LOCK (queue);
2458 gst_queue2_locked_flush (queue, FALSE, TRUE);
2459 queue->srcresult = GST_FLOW_OK;
2460 queue->sinkresult = GST_FLOW_OK;
2461 queue->is_eos = FALSE;
2462 queue->unexpected = FALSE;
2463 queue->seeking = FALSE;
2464 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2465 /* reset rate counters */
2466 reset_rate_timer (queue);
2467 gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2468 queue->srcpad, NULL);
2469 GST_QUEUE2_MUTEX_UNLOCK (queue);
2471 GST_QUEUE2_MUTEX_LOCK (queue);
2472 queue->segment_event_received = FALSE;
2473 queue->is_eos = FALSE;
2474 queue->unexpected = FALSE;
2475 queue->sinkresult = GST_FLOW_OK;
2476 queue->seeking = FALSE;
2477 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2478 GST_QUEUE2_MUTEX_UNLOCK (queue);
2480 gst_event_unref (event);
2484 case GST_EVENT_TAG:{
2485 if (queue->use_tags_bitrate) {
2489 gst_event_parse_tag (event, &tags);
2490 if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2491 gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2492 GST_QUEUE2_MUTEX_LOCK (queue);
2493 queue->sink_tags_bitrate = bitrate;
2494 GST_QUEUE2_MUTEX_UNLOCK (queue);
2495 GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2501 if (GST_EVENT_IS_SERIALIZED (event)) {
2502 /* serialized events go in the queue */
2503 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2504 if (queue->srcresult != GST_FLOW_OK) {
2505 /* Errors in sticky event pushing are no problem and ignored here
2506 * as they will cause more meaningful errors during data flow.
2507 * For EOS events, that are not followed by data flow, we still
2508 * return FALSE here though and report an error.
2510 if (!GST_EVENT_IS_STICKY (event)) {
2511 goto out_flow_error;
2512 } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2513 if (queue->srcresult == GST_FLOW_NOT_LINKED
2514 || queue->srcresult < GST_FLOW_EOS) {
2515 GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2516 (_("Internal data flow error.")),
2517 ("streaming task paused, reason %s (%d)",
2518 gst_flow_get_name (queue->srcresult), queue->srcresult));
2520 goto out_flow_error;
2523 /* refuse more events on EOS */
2526 gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2527 GST_QUEUE2_MUTEX_UNLOCK (queue);
2528 gst_queue2_post_buffering (queue);
2530 /* non-serialized events are passed upstream. */
2531 ret = gst_pad_push_event (queue->srcpad, event);
2540 GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2541 GST_QUEUE2_MUTEX_UNLOCK (queue);
2542 gst_event_unref (event);
2547 GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2548 GST_QUEUE2_MUTEX_UNLOCK (queue);
2549 gst_event_unref (event);
2554 GST_LOG_OBJECT (queue,
2555 "refusing event, we have a downstream flow error: %s",
2556 gst_flow_get_name (queue->srcresult));
2557 GST_QUEUE2_MUTEX_UNLOCK (queue);
2558 gst_event_unref (event);
2564 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2570 queue = GST_QUEUE2 (parent);
2572 switch (GST_QUERY_TYPE (query)) {
2574 if (GST_QUERY_IS_SERIALIZED (query)) {
2575 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
2576 /* serialized events go in the queue. We need to be certain that we
2577 * don't cause deadlocks waiting for the query return value. We check if
2578 * the queue is empty (nothing is blocking downstream and the query can
2579 * be pushed for sure) or we are not buffering. If we are buffering,
2580 * the pipeline waits to unblock downstream until our queue fills up
2581 * completely, which can not happen if we block on the query..
2582 * Therefore we only potentially block when we are not buffering. */
2583 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2584 if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue)
2585 || !queue->use_buffering)) {
2586 if (!g_atomic_int_get (&queue->downstream_may_block)) {
2587 gst_queue2_locked_enqueue (queue, query,
2588 GST_QUEUE2_ITEM_TYPE_QUERY);
2590 STATUS (queue, queue->sinkpad, "wait for QUERY");
2591 g_cond_wait (&queue->query_handled, &queue->qlock);
2592 if (queue->sinkresult != GST_FLOW_OK)
2594 res = queue->last_query;
2596 GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2600 GST_DEBUG_OBJECT (queue,
2601 "refusing query, we are not using the queue");
2604 GST_QUEUE2_MUTEX_UNLOCK (queue);
2605 gst_queue2_post_buffering (queue);
2607 res = gst_pad_query_default (pad, parent, query);
2616 GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
2617 GST_QUEUE2_MUTEX_UNLOCK (queue);
2623 gst_queue2_is_empty (GstQueue2 * queue)
2625 /* never empty on EOS */
2629 if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2630 return queue->current->writing_pos <= queue->current->max_reading_pos;
2632 if (queue->queue.length == 0)
2640 gst_queue2_is_filled (GstQueue2 * queue)
2644 /* always filled on EOS */
2648 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2649 (queue->cur_level.format) >= ((alt_max) ? \
2650 MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2652 /* if using a ring buffer we're filled if all ring buffer space is used
2653 * _by the current range_ */
2654 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2655 guint64 rb_size = queue->ring_buffer_max_size;
2656 GST_DEBUG_OBJECT (queue,
2657 "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2658 queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2659 return CHECK_FILLED (bytes, rb_size);
2662 /* if using file, we're never filled if we don't have EOS */
2663 if (QUEUE_IS_USING_TEMP_FILE (queue))
2666 /* we are never filled when we have no buffers at all */
2667 if (queue->cur_level.buffers == 0)
2670 /* we are filled if one of the current levels exceeds the max */
2671 res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2672 || CHECK_FILLED (time, 0);
2674 /* if we need to, use the rate estimate to check against the max time we are
2675 * allowed to queue */
2676 if (queue->use_rate_estimate)
2677 res |= CHECK_FILLED (rate_time, 0);
2683 static GstFlowReturn
2684 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2685 GstMiniObject * item, GstQueue2ItemType item_type)
2687 /* we have to lock the queue since we span threads */
2688 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2689 /* when we received EOS, we refuse more data */
2692 /* when we received unexpected from downstream, refuse more buffers */
2693 if (queue->unexpected)
2694 goto out_unexpected;
2696 /* while we didn't receive the newsegment, we're seeking and we skip data */
2700 if (!gst_queue2_wait_free_space (queue))
2703 /* put buffer in queue now */
2704 gst_queue2_locked_enqueue (queue, item, item_type);
2705 GST_QUEUE2_MUTEX_UNLOCK (queue);
2706 gst_queue2_post_buffering (queue);
2710 /* special conditions */
2713 GstFlowReturn ret = queue->sinkresult;
2715 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2716 "exit because task paused, reason: %s", gst_flow_get_name (ret));
2717 GST_QUEUE2_MUTEX_UNLOCK (queue);
2718 gst_mini_object_unref (item);
2724 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2725 GST_QUEUE2_MUTEX_UNLOCK (queue);
2726 gst_mini_object_unref (item);
2728 return GST_FLOW_EOS;
2732 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2733 GST_QUEUE2_MUTEX_UNLOCK (queue);
2734 gst_mini_object_unref (item);
2740 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2741 GST_QUEUE2_MUTEX_UNLOCK (queue);
2742 gst_mini_object_unref (item);
2744 return GST_FLOW_EOS;
2748 static GstFlowReturn
2749 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2753 queue = GST_QUEUE2 (parent);
2755 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2756 "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2757 GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
2758 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2759 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2761 return gst_queue2_chain_buffer_or_buffer_list (queue,
2762 GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
2765 static GstFlowReturn
2766 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
2767 GstBufferList * buffer_list)
2771 queue = GST_QUEUE2 (parent);
2773 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2774 "received buffer list %p", buffer_list);
2776 return gst_queue2_chain_buffer_or_buffer_list (queue,
2777 GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2780 static GstMiniObject *
2781 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
2783 GstMiniObject *data;
2785 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
2787 /* stop pushing buffers, we dequeue all items until we see an item that we
2788 * can push again, which is EOS or SEGMENT. If there is nothing in the
2789 * queue we can push, we set a flag to make the sinkpad refuse more
2790 * buffers with an EOS return value until we receive something
2791 * pushable again or we get flushed. */
2792 while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
2793 if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2794 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2795 "dropping EOS buffer %p", data);
2796 gst_buffer_unref (GST_BUFFER_CAST (data));
2797 } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2798 GstEvent *event = GST_EVENT_CAST (data);
2799 GstEventType type = GST_EVENT_TYPE (event);
2801 if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
2802 /* we found a pushable item in the queue, push it out */
2803 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2804 "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
2807 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2808 "dropping EOS event %p", event);
2809 gst_event_unref (event);
2810 } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2811 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2812 "dropping EOS buffer list %p", data);
2813 gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
2814 } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2815 queue->last_query = FALSE;
2816 g_cond_signal (&queue->query_handled);
2817 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
2820 /* no more items in the queue. Set the unexpected flag so that upstream
2821 * make us refuse any more buffers on the sinkpad. Since we will still
2822 * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
2823 * task function does not shut down. */
2824 queue->unexpected = TRUE;
2828 /* dequeue an item from the queue an push it downstream. This functions returns
2829 * the result of the push. */
2830 static GstFlowReturn
2831 gst_queue2_push_one (GstQueue2 * queue)
2833 GstFlowReturn result = queue->srcresult;
2834 GstMiniObject *data;
2835 GstQueue2ItemType item_type;
2837 data = gst_queue2_locked_dequeue (queue, &item_type);
2842 STATUS (queue, queue->srcpad, "We have something dequeud");
2843 g_atomic_int_set (&queue->downstream_may_block,
2844 item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
2845 item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
2846 GST_QUEUE2_MUTEX_UNLOCK (queue);
2847 gst_queue2_post_buffering (queue);
2849 if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2852 buffer = GST_BUFFER_CAST (data);
2854 result = gst_pad_push (queue->srcpad, buffer);
2855 g_atomic_int_set (&queue->downstream_may_block, 0);
2857 /* need to check for srcresult here as well */
2858 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2859 if (result == GST_FLOW_EOS) {
2860 data = gst_queue2_dequeue_on_eos (queue, &item_type);
2863 /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2864 * to the caller so that the task function does not shut down */
2865 result = GST_FLOW_OK;
2867 } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2868 GstEvent *event = GST_EVENT_CAST (data);
2869 GstEventType type = GST_EVENT_TYPE (event);
2871 if (type == GST_EVENT_TAG) {
2872 if (queue->use_tags_bitrate) {
2876 gst_event_parse_tag (event, &tags);
2877 if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2878 gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2879 GST_QUEUE2_MUTEX_LOCK (queue);
2880 queue->src_tags_bitrate = bitrate;
2881 GST_QUEUE2_MUTEX_UNLOCK (queue);
2882 GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
2887 gst_pad_push_event (queue->srcpad, event);
2889 /* if we're EOS, return EOS so that the task pauses. */
2890 if (type == GST_EVENT_EOS) {
2891 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2892 "pushed EOS event %p, return EOS", event);
2893 result = GST_FLOW_EOS;
2896 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2897 } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2898 GstBufferList *buffer_list;
2900 buffer_list = GST_BUFFER_LIST_CAST (data);
2902 result = gst_pad_push_list (queue->srcpad, buffer_list);
2903 g_atomic_int_set (&queue->downstream_may_block, 0);
2905 /* need to check for srcresult here as well */
2906 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2907 if (result == GST_FLOW_EOS) {
2908 data = gst_queue2_dequeue_on_eos (queue, &item_type);
2911 /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
2912 * to the caller so that the task function does not shut down */
2913 result = GST_FLOW_OK;
2915 } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
2916 GstQuery *query = GST_QUERY_CAST (data);
2918 GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
2919 queue->last_query = gst_pad_peer_query (queue->srcpad, query);
2920 GST_LOG_OBJECT (queue->srcpad, "Peered query");
2921 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2922 "did query %p, return %d", query, queue->last_query);
2923 g_cond_signal (&queue->query_handled);
2924 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2925 result = GST_FLOW_OK;
2932 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2933 "exit because we have no item in the queue");
2934 return GST_FLOW_ERROR;
2938 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2939 return GST_FLOW_FLUSHING;
2943 /* called repeatedly with @pad as the source pad. This function should push out
2944 * data to the peer element. */
2946 gst_queue2_loop (GstPad * pad)
2951 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2953 /* have to lock for thread-safety */
2954 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2956 if (gst_queue2_is_empty (queue)) {
2959 /* pause the timer while we wait. The fact that we are waiting does not mean
2960 * the byterate on the output pad is lower */
2961 if ((started = queue->out_timer_started))
2962 g_timer_stop (queue->out_timer);
2964 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2965 "queue is empty, waiting for new data");
2967 /* Wait for data to be available, we could be unlocked because of a flush. */
2968 GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2970 while (gst_queue2_is_empty (queue));
2972 /* and continue if we were running before */
2974 g_timer_continue (queue->out_timer);
2976 ret = gst_queue2_push_one (queue);
2977 queue->srcresult = ret;
2978 queue->sinkresult = ret;
2979 if (ret != GST_FLOW_OK)
2982 GST_QUEUE2_MUTEX_UNLOCK (queue);
2983 gst_queue2_post_buffering (queue);
2990 gboolean eos = queue->is_eos;
2991 GstFlowReturn ret = queue->srcresult;
2993 gst_pad_pause_task (queue->srcpad);
2994 if (ret == GST_FLOW_FLUSHING) {
2995 gst_queue2_locked_flush (queue, FALSE, FALSE);
2997 GST_QUEUE2_SIGNAL_DEL (queue);
2998 queue->last_query = FALSE;
2999 g_cond_signal (&queue->query_handled);
3001 GST_QUEUE2_MUTEX_UNLOCK (queue);
3002 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3003 "pause task, reason: %s", gst_flow_get_name (queue->srcresult));
3004 /* let app know about us giving up if upstream is not expected to do so */
3005 /* EOS is already taken care of elsewhere */
3006 if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3007 GST_ELEMENT_ERROR (queue, STREAM, FAILED,
3008 (_("Internal data flow error.")),
3009 ("streaming task paused, reason %s (%d)",
3010 gst_flow_get_name (ret), ret));
3011 gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3018 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3020 gboolean res = TRUE;
3021 GstQueue2 *queue = GST_QUEUE2 (parent);
3023 #ifndef GST_DISABLE_GST_DEBUG
3024 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3025 event, GST_EVENT_TYPE_NAME (event));
3028 switch (GST_EVENT_TYPE (event)) {
3029 case GST_EVENT_FLUSH_START:
3030 if (QUEUE_IS_USING_QUEUE (queue)) {
3031 /* just forward upstream */
3032 res = gst_pad_push_event (queue->sinkpad, event);
3034 /* now unblock the getrange function */
3035 GST_QUEUE2_MUTEX_LOCK (queue);
3036 GST_DEBUG_OBJECT (queue, "flushing");
3037 queue->srcresult = GST_FLOW_FLUSHING;
3038 GST_QUEUE2_SIGNAL_ADD (queue);
3039 GST_QUEUE2_MUTEX_UNLOCK (queue);
3041 /* when using a temp file, we eat the event */
3043 gst_event_unref (event);
3046 case GST_EVENT_FLUSH_STOP:
3047 if (QUEUE_IS_USING_QUEUE (queue)) {
3048 /* just forward upstream */
3049 res = gst_pad_push_event (queue->sinkpad, event);
3051 /* now unblock the getrange function */
3052 GST_QUEUE2_MUTEX_LOCK (queue);
3053 queue->srcresult = GST_FLOW_OK;
3054 GST_QUEUE2_MUTEX_UNLOCK (queue);
3056 /* when using a temp file, we eat the event */
3058 gst_event_unref (event);
3061 case GST_EVENT_RECONFIGURE:
3062 GST_QUEUE2_MUTEX_LOCK (queue);
3063 /* assume downstream is linked now and try to push again */
3064 if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3065 queue->srcresult = GST_FLOW_OK;
3066 queue->sinkresult = GST_FLOW_OK;
3067 if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3068 gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3072 GST_QUEUE2_MUTEX_UNLOCK (queue);
3074 res = gst_pad_push_event (queue->sinkpad, event);
3077 res = gst_pad_push_event (queue->sinkpad, event);
3085 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3089 queue = GST_QUEUE2 (parent);
3091 switch (GST_QUERY_TYPE (query)) {
3092 case GST_QUERY_POSITION:
3097 if (!gst_pad_peer_query (queue->sinkpad, query))
3100 /* get peer position */
3101 gst_query_parse_position (query, &format, &peer_pos);
3103 /* FIXME: this code assumes that there's no discont in the queue */
3105 case GST_FORMAT_BYTES:
3106 peer_pos -= queue->cur_level.bytes;
3108 case GST_FORMAT_TIME:
3109 peer_pos -= queue->cur_level.time;
3112 GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3113 "know how to adjust value", gst_format_get_name (format));
3116 /* set updated position */
3117 gst_query_set_position (query, format, peer_pos);
3120 case GST_QUERY_DURATION:
3122 GST_DEBUG_OBJECT (queue, "doing peer query");
3124 if (!gst_pad_peer_query (queue->sinkpad, query))
3127 GST_DEBUG_OBJECT (queue, "peer query success");
3130 case GST_QUERY_BUFFERING:
3133 gboolean is_buffering;
3134 GstBufferingMode mode;
3135 gint avg_in, avg_out;
3136 gint64 buffering_left;
3138 GST_DEBUG_OBJECT (queue, "query buffering");
3140 get_buffering_percent (queue, &is_buffering, &percent);
3141 gst_query_set_buffering_percent (query, is_buffering, percent);
3143 get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3145 gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3148 if (!QUEUE_IS_USING_QUEUE (queue)) {
3149 /* add ranges for download and ringbuffer buffering */
3151 gint64 start, stop, range_start, range_stop;
3152 guint64 writing_pos;
3153 gint64 estimated_total;
3155 gboolean peer_res, is_eos;
3156 GstQueue2Range *queued_ranges;
3158 /* we need a current download region */
3159 if (queue->current == NULL)
3162 writing_pos = queue->current->writing_pos;
3163 is_eos = queue->is_eos;
3166 /* we're EOS, we know the duration in bytes now */
3168 duration = writing_pos;
3170 /* get duration of upstream in bytes */
3171 peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3172 GST_FORMAT_BYTES, &duration);
3175 GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3176 ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3178 /* calculate remaining and total download time */
3179 if (peer_res && avg_in > 0.0)
3180 estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3182 estimated_total = -1;
3184 GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3187 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3190 case GST_FORMAT_PERCENT:
3191 /* we need duration */
3196 /* get our available data relative to the duration */
3199 gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3204 case GST_FORMAT_BYTES:
3214 /* fill out the buffered ranges */
3215 for (queued_ranges = queue->ranges; queued_ranges;
3216 queued_ranges = queued_ranges->next) {
3218 case GST_FORMAT_PERCENT:
3219 if (duration == -1) {
3225 gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3226 queued_ranges->offset, duration);
3228 gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3229 queued_ranges->writing_pos, duration);
3231 case GST_FORMAT_BYTES:
3232 range_start = queued_ranges->offset;
3233 range_stop = queued_ranges->writing_pos;
3240 if (range_start == range_stop)
3242 GST_DEBUG_OBJECT (queue,
3243 "range starting at %" G_GINT64_FORMAT " and finishing at %"
3244 G_GINT64_FORMAT, range_start, range_stop);
3245 gst_query_add_buffering_range (query, range_start, range_stop);
3248 gst_query_set_buffering_range (query, format, start, stop,
3253 case GST_QUERY_SCHEDULING:
3256 GstSchedulingFlags flags = 0;
3258 if (!gst_pad_peer_query (queue->sinkpad, query))
3261 gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
3263 /* we can operate in pull mode when we are using a tempfile */
3264 pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3267 flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3268 gst_query_set_scheduling (query, flags, 0, -1, 0);
3270 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3271 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3275 /* peer handled other queries */
3276 if (!gst_pad_query_default (pad, parent, query))
3286 GST_DEBUG_OBJECT (queue, "failed peer query");
3292 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3294 GstQueue2 *queue = GST_QUEUE2 (element);
3296 /* simply forward to the srcpad query function */
3297 return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3302 gst_queue2_update_upstream_size (GstQueue2 * queue)
3304 gint64 upstream_size = -1;
3306 if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3308 GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3310 /* upstream_size can be negative but queue->upstream_size is unsigned.
3311 * Prevent setting negative values to it (the query can return -1) */
3312 if (upstream_size >= 0)
3313 queue->upstream_size = upstream_size;
3315 queue->upstream_size = 0;
3319 static GstFlowReturn
3320 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3321 guint length, GstBuffer ** buffer)
3326 queue = GST_QUEUE2_CAST (parent);
3328 length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3329 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3330 offset = (offset == -1) ? queue->current->reading_pos : offset;
3332 GST_DEBUG_OBJECT (queue,
3333 "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3335 /* catch any reads beyond the size of the file here to make sure queue2
3336 * doesn't send seek events beyond the size of the file upstream, since
3337 * that would confuse elements such as souphttpsrc and/or http servers.
3338 * Demuxers often just loop until EOS at the end of the file to figure out
3339 * when they've read all the end-headers or index chunks. */
3340 if (G_UNLIKELY (offset >= queue->upstream_size)) {
3341 gst_queue2_update_upstream_size (queue);
3342 if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3343 goto out_unexpected;
3346 if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3347 gst_queue2_update_upstream_size (queue);
3348 if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3349 length = queue->upstream_size - offset;
3350 GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3354 /* FIXME - function will block when the range is not yet available */
3355 ret = gst_queue2_create_read (queue, offset, length, buffer);
3356 GST_QUEUE2_MUTEX_UNLOCK (queue);
3357 gst_queue2_post_buffering (queue);
3364 ret = queue->srcresult;
3366 GST_DEBUG_OBJECT (queue, "we are flushing");
3367 GST_QUEUE2_MUTEX_UNLOCK (queue);
3372 GST_DEBUG_OBJECT (queue, "read beyond end of file");
3373 GST_QUEUE2_MUTEX_UNLOCK (queue);
3374 return GST_FLOW_EOS;
3378 /* sink currently only operates in push mode */
3380 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3381 GstPadMode mode, gboolean active)
3386 queue = GST_QUEUE2 (parent);
3389 case GST_PAD_MODE_PUSH:
3391 GST_QUEUE2_MUTEX_LOCK (queue);
3392 GST_DEBUG_OBJECT (queue, "activating push mode");
3393 queue->srcresult = GST_FLOW_OK;
3394 queue->sinkresult = GST_FLOW_OK;
3395 queue->is_eos = FALSE;
3396 queue->unexpected = FALSE;
3397 reset_rate_timer (queue);
3398 GST_QUEUE2_MUTEX_UNLOCK (queue);
3400 /* unblock chain function */
3401 GST_QUEUE2_MUTEX_LOCK (queue);
3402 GST_DEBUG_OBJECT (queue, "deactivating push mode");
3403 queue->srcresult = GST_FLOW_FLUSHING;
3404 queue->sinkresult = GST_FLOW_FLUSHING;
3405 GST_QUEUE2_SIGNAL_DEL (queue);
3406 GST_QUEUE2_MUTEX_UNLOCK (queue);
3408 /* wait until it is unblocked and clean up */
3409 GST_PAD_STREAM_LOCK (pad);
3410 GST_QUEUE2_MUTEX_LOCK (queue);
3411 gst_queue2_locked_flush (queue, TRUE, FALSE);
3412 GST_QUEUE2_MUTEX_UNLOCK (queue);
3413 GST_PAD_STREAM_UNLOCK (pad);
3424 /* src operating in push mode, we start a task on the source pad that pushes out
3425 * buffers from the queue */
3427 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3429 gboolean result = FALSE;
3432 queue = GST_QUEUE2 (parent);
3435 GST_QUEUE2_MUTEX_LOCK (queue);
3436 GST_DEBUG_OBJECT (queue, "activating push mode");
3437 queue->srcresult = GST_FLOW_OK;
3438 queue->sinkresult = GST_FLOW_OK;
3439 queue->is_eos = FALSE;
3440 queue->unexpected = FALSE;
3442 gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3443 GST_QUEUE2_MUTEX_UNLOCK (queue);
3445 /* unblock loop function */
3446 GST_QUEUE2_MUTEX_LOCK (queue);
3447 GST_DEBUG_OBJECT (queue, "deactivating push mode");
3448 queue->srcresult = GST_FLOW_FLUSHING;
3449 queue->sinkresult = GST_FLOW_FLUSHING;
3450 /* the item add signal will unblock */
3451 GST_QUEUE2_SIGNAL_ADD (queue);
3452 GST_QUEUE2_MUTEX_UNLOCK (queue);
3454 /* step 2, make sure streaming finishes */
3455 result = gst_pad_stop_task (pad);
3461 /* pull mode, downstream will call our getrange function */
3463 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3468 queue = GST_QUEUE2 (parent);
3471 GST_QUEUE2_MUTEX_LOCK (queue);
3472 if (!QUEUE_IS_USING_QUEUE (queue)) {
3473 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3474 /* open the temp file now */
3475 result = gst_queue2_open_temp_location_file (queue);
3476 } else if (!queue->ring_buffer) {
3477 queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3478 result = ! !queue->ring_buffer;
3483 GST_DEBUG_OBJECT (queue, "activating pull mode");
3484 init_ranges (queue);
3485 queue->srcresult = GST_FLOW_OK;
3486 queue->sinkresult = GST_FLOW_OK;
3487 queue->is_eos = FALSE;
3488 queue->unexpected = FALSE;
3489 queue->upstream_size = 0;
3491 GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3492 /* this is not allowed, we cannot operate in pull mode without a temp
3494 queue->srcresult = GST_FLOW_FLUSHING;
3495 queue->sinkresult = GST_FLOW_FLUSHING;
3498 GST_QUEUE2_MUTEX_UNLOCK (queue);
3500 GST_QUEUE2_MUTEX_LOCK (queue);
3501 GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3502 queue->srcresult = GST_FLOW_FLUSHING;
3503 queue->sinkresult = GST_FLOW_FLUSHING;
3504 /* this will unlock getrange */
3505 GST_QUEUE2_SIGNAL_ADD (queue);
3507 GST_QUEUE2_MUTEX_UNLOCK (queue);
3514 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3520 case GST_PAD_MODE_PULL:
3521 res = gst_queue2_src_activate_pull (pad, parent, active);
3523 case GST_PAD_MODE_PUSH:
3524 res = gst_queue2_src_activate_push (pad, parent, active);
3527 GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3534 static GstStateChangeReturn
3535 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3538 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3540 queue = GST_QUEUE2 (element);
3542 switch (transition) {
3543 case GST_STATE_CHANGE_NULL_TO_READY:
3545 case GST_STATE_CHANGE_READY_TO_PAUSED:
3546 GST_QUEUE2_MUTEX_LOCK (queue);
3547 if (!QUEUE_IS_USING_QUEUE (queue)) {
3548 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3549 if (!gst_queue2_open_temp_location_file (queue))
3550 ret = GST_STATE_CHANGE_FAILURE;
3552 if (queue->ring_buffer) {
3553 g_free (queue->ring_buffer);
3554 queue->ring_buffer = NULL;
3556 if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3557 ret = GST_STATE_CHANGE_FAILURE;
3559 init_ranges (queue);
3561 queue->segment_event_received = FALSE;
3562 queue->starting_segment = NULL;
3563 gst_event_replace (&queue->stream_start_event, NULL);
3564 GST_QUEUE2_MUTEX_UNLOCK (queue);
3566 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3572 if (ret == GST_STATE_CHANGE_FAILURE)
3575 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3577 if (ret == GST_STATE_CHANGE_FAILURE)
3580 switch (transition) {
3581 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3583 case GST_STATE_CHANGE_PAUSED_TO_READY:
3584 GST_QUEUE2_MUTEX_LOCK (queue);
3585 if (!QUEUE_IS_USING_QUEUE (queue)) {
3586 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3587 gst_queue2_close_temp_location_file (queue);
3588 } else if (queue->ring_buffer) {
3589 g_free (queue->ring_buffer);
3590 queue->ring_buffer = NULL;
3592 clean_ranges (queue);
3594 if (queue->starting_segment != NULL) {
3595 gst_event_unref (queue->starting_segment);
3596 queue->starting_segment = NULL;
3598 gst_event_replace (&queue->stream_start_event, NULL);
3599 GST_QUEUE2_MUTEX_UNLOCK (queue);
3601 case GST_STATE_CHANGE_READY_TO_NULL:
3610 /* changing the capacity of the queue must wake up
3611 * the _chain function, it might have more room now
3612 * to store the buffer/event in the queue */
3613 #define QUEUE_CAPACITY_CHANGE(q) \
3614 GST_QUEUE2_SIGNAL_DEL (queue); \
3615 if (queue->use_buffering) \
3616 update_buffering (queue);
3618 /* Changing the minimum required fill level must
3619 * wake up the _loop function as it might now
3620 * be able to preceed.
3622 #define QUEUE_THRESHOLD_CHANGE(q)\
3623 GST_QUEUE2_SIGNAL_ADD (queue);
3626 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3630 /* the element must be stopped in order to do this */
3631 GST_OBJECT_LOCK (queue);
3632 state = GST_STATE (queue);
3633 if (state != GST_STATE_READY && state != GST_STATE_NULL)
3635 GST_OBJECT_UNLOCK (queue);
3637 /* set new location */
3638 g_free (queue->temp_template);
3639 queue->temp_template = g_strdup (template);
3646 GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3647 GST_OBJECT_UNLOCK (queue);
3652 gst_queue2_set_property (GObject * object,
3653 guint prop_id, const GValue * value, GParamSpec * pspec)
3655 GstQueue2 *queue = GST_QUEUE2 (object);
3657 /* someone could change levels here, and since this
3658 * affects the get/put funcs, we need to lock for safety. */
3659 GST_QUEUE2_MUTEX_LOCK (queue);
3662 case PROP_MAX_SIZE_BYTES:
3663 queue->max_level.bytes = g_value_get_uint (value);
3664 QUEUE_CAPACITY_CHANGE (queue);
3666 case PROP_MAX_SIZE_BUFFERS:
3667 queue->max_level.buffers = g_value_get_uint (value);
3668 QUEUE_CAPACITY_CHANGE (queue);
3670 case PROP_MAX_SIZE_TIME:
3671 queue->max_level.time = g_value_get_uint64 (value);
3672 /* set rate_time to the same value. We use an extra field in the level
3673 * structure so that we can easily access and compare it */
3674 queue->max_level.rate_time = queue->max_level.time;
3675 QUEUE_CAPACITY_CHANGE (queue);
3677 case PROP_USE_BUFFERING:
3678 queue->use_buffering = g_value_get_boolean (value);
3679 if (!queue->use_buffering && queue->is_buffering) {
3680 GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3681 "posting 100%% message");
3682 SET_PERCENT (queue, 100);
3683 queue->is_buffering = FALSE;
3686 if (queue->use_buffering) {
3687 queue->is_buffering = TRUE;
3688 update_buffering (queue);
3691 case PROP_USE_TAGS_BITRATE:
3692 queue->use_tags_bitrate = g_value_get_boolean (value);
3694 case PROP_USE_RATE_ESTIMATE:
3695 queue->use_rate_estimate = g_value_get_boolean (value);
3697 case PROP_LOW_PERCENT:
3698 queue->low_percent = g_value_get_int (value);
3700 case PROP_HIGH_PERCENT:
3701 queue->high_percent = g_value_get_int (value);
3703 case PROP_TEMP_TEMPLATE:
3704 gst_queue2_set_temp_template (queue, g_value_get_string (value));
3706 case PROP_TEMP_REMOVE:
3707 queue->temp_remove = g_value_get_boolean (value);
3709 case PROP_RING_BUFFER_MAX_SIZE:
3710 queue->ring_buffer_max_size = g_value_get_uint64 (value);
3713 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3717 GST_QUEUE2_MUTEX_UNLOCK (queue);
3718 gst_queue2_post_buffering (queue);
3722 gst_queue2_get_property (GObject * object,
3723 guint prop_id, GValue * value, GParamSpec * pspec)
3725 GstQueue2 *queue = GST_QUEUE2 (object);
3727 GST_QUEUE2_MUTEX_LOCK (queue);
3730 case PROP_CUR_LEVEL_BYTES:
3731 g_value_set_uint (value, queue->cur_level.bytes);
3733 case PROP_CUR_LEVEL_BUFFERS:
3734 g_value_set_uint (value, queue->cur_level.buffers);
3736 case PROP_CUR_LEVEL_TIME:
3737 g_value_set_uint64 (value, queue->cur_level.time);
3739 case PROP_MAX_SIZE_BYTES:
3740 g_value_set_uint (value, queue->max_level.bytes);
3742 case PROP_MAX_SIZE_BUFFERS:
3743 g_value_set_uint (value, queue->max_level.buffers);
3745 case PROP_MAX_SIZE_TIME:
3746 g_value_set_uint64 (value, queue->max_level.time);
3748 case PROP_USE_BUFFERING:
3749 g_value_set_boolean (value, queue->use_buffering);
3751 case PROP_USE_TAGS_BITRATE:
3752 g_value_set_boolean (value, queue->use_tags_bitrate);
3754 case PROP_USE_RATE_ESTIMATE:
3755 g_value_set_boolean (value, queue->use_rate_estimate);
3757 case PROP_LOW_PERCENT:
3758 g_value_set_int (value, queue->low_percent);
3760 case PROP_HIGH_PERCENT:
3761 g_value_set_int (value, queue->high_percent);
3763 case PROP_TEMP_TEMPLATE:
3764 g_value_set_string (value, queue->temp_template);
3766 case PROP_TEMP_LOCATION:
3767 g_value_set_string (value, queue->temp_location);
3769 case PROP_TEMP_REMOVE:
3770 g_value_set_boolean (value, queue->temp_remove);
3772 case PROP_RING_BUFFER_MAX_SIZE:
3773 g_value_set_uint64 (value, queue->ring_buffer_max_size);
3775 case PROP_AVG_IN_RATE:
3777 gdouble in_rate = queue->byte_in_rate;
3779 /* During the first RATE_INTERVAL, byte_in_rate will not have been
3780 * calculated, so calculate it here. */
3781 if (in_rate == 0.0 && queue->bytes_in
3782 && queue->last_update_in_rates_elapsed > 0.0)
3783 in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
3785 g_value_set_int64 (value, (gint64) in_rate);
3789 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3793 GST_QUEUE2_MUTEX_UNLOCK (queue);