2 * Copyright (C) 2014 Wim Taymans <wim.taymans@gmail.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
23 * SECTION:element-downloadbuffer
25 * The downloadbuffer element provides on-disk buffering and caching of, typically,
26 * a network file. temp-template should be set to a value such as
27 * /tmp/gstreamer-XXXXXX and the element will allocate a random free filename and
28 * buffer the data in the file.
30 * With max-size-bytes and max-size-time you can configure the buffering limits.
31 * The downloadbuffer element will try to read-ahead these amounts of data. When
32 * the amount of read-ahead data drops below low-percent of the configured max,
33 * the element will start emiting BUFFERING messages until high-percent of max is
36 * The downloadbuffer provides push and pull based scheduling on its source pad
37 * and will efficiently seek in the upstream element when needed.
39 * The temp-location property will be used to notify the application of the
42 * When the downloadbuffer has completely downloaded the media, it will
43 * post an application message named <classname>"GstCacheDownloadComplete"</classname>
44 * with the following information:
53 #include "gstdownloadbuffer.h"
55 #include <glib/gstdio.h>
58 #include "gst/gst-i18n-lib.h"
59 #include "gst/glib-compat-private.h"
63 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
68 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
73 GST_DEBUG_CATEGORY_STATIC (downloadbuffer_debug);
74 #define GST_CAT_DEFAULT (downloadbuffer_debug)
82 #define DEFAULT_BUFFER_SIZE 4096
84 /* default property values */
85 #define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */
86 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */
87 #define DEFAULT_LOW_PERCENT 10
88 #define DEFAULT_HIGH_PERCENT 99
89 #define DEFAULT_TEMP_REMOVE TRUE
104 #define GST_DOWNLOAD_BUFFER_CLEAR_LEVEL(l) G_STMT_START { \
109 #define STATUS(elem, pad, msg) \
110 GST_LOG_OBJECT (elem, "(%s:%s) " msg ": %u of %u " \
111 "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
113 GST_DEBUG_PAD_NAME (pad), \
114 elem->cur_level.bytes, \
115 elem->max_level.bytes, \
116 elem->cur_level.time, \
117 elem->max_level.time)
119 #define GST_DOWNLOAD_BUFFER_MUTEX_LOCK(q) G_STMT_START { \
120 g_mutex_lock (&q->qlock); \
123 #define GST_DOWNLOAD_BUFFER_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START { \
124 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (q); \
125 if (res != GST_FLOW_OK) \
129 #define GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK(q) G_STMT_START { \
130 g_mutex_unlock (&q->qlock); \
133 #define GST_DOWNLOAD_BUFFER_WAIT_ADD_CHECK(q, res, o, label) G_STMT_START { \
134 STATUS (q, q->srcpad, "wait for ADD"); \
135 q->waiting_add = TRUE; \
136 q->waiting_offset = o; \
137 g_cond_wait (&q->item_add, &q->qlock); \
138 q->waiting_add = FALSE; \
139 if (res != GST_FLOW_OK) { \
140 STATUS (q, q->srcpad, "received ADD wakeup"); \
143 STATUS (q, q->srcpad, "received ADD"); \
146 #define GST_DOWNLOAD_BUFFER_SIGNAL_ADD(q, o) G_STMT_START { \
147 if (q->waiting_add && q->waiting_offset <= o) { \
148 STATUS (q, q->sinkpad, "signal ADD"); \
149 g_cond_signal (&q->item_add); \
154 GST_DEBUG_CATEGORY_INIT (downloadbuffer_debug, "downloadbuffer", 0, \
155 "downloadbuffer element");
157 #define gst_download_buffer_parent_class parent_class
158 G_DEFINE_TYPE_WITH_CODE (GstDownloadBuffer, gst_download_buffer,
159 GST_TYPE_ELEMENT, _do_init);
161 static void update_buffering (GstDownloadBuffer * dlbuf);
163 static void gst_download_buffer_finalize (GObject * object);
165 static void gst_download_buffer_set_property (GObject * object,
166 guint prop_id, const GValue * value, GParamSpec * pspec);
167 static void gst_download_buffer_get_property (GObject * object,
168 guint prop_id, GValue * value, GParamSpec * pspec);
170 static GstFlowReturn gst_download_buffer_chain (GstPad * pad,
171 GstObject * parent, GstBuffer * buffer);
172 static void gst_download_buffer_loop (GstPad * pad);
174 static gboolean gst_download_buffer_handle_sink_event (GstPad * pad,
175 GstObject * parent, GstEvent * event);
176 static gboolean gst_download_buffer_handle_sink_query (GstPad * pad,
177 GstObject * parent, GstQuery * query);
179 static gboolean gst_download_buffer_handle_src_event (GstPad * pad,
180 GstObject * parent, GstEvent * event);
181 static gboolean gst_download_buffer_handle_src_query (GstPad * pad,
182 GstObject * parent, GstQuery * query);
183 static gboolean gst_download_buffer_handle_query (GstElement * element,
186 static GstFlowReturn gst_download_buffer_get_range (GstPad * pad,
187 GstObject * parent, guint64 offset, guint length, GstBuffer ** buffer);
189 static gboolean gst_download_buffer_src_activate_mode (GstPad * pad,
190 GstObject * parent, GstPadMode mode, gboolean active);
191 static gboolean gst_download_buffer_sink_activate_mode (GstPad * pad,
192 GstObject * parent, GstPadMode mode, gboolean active);
193 static GstStateChangeReturn gst_download_buffer_change_state (GstElement *
194 element, GstStateChange transition);
196 /* static guint gst_download_buffer_signals[LAST_SIGNAL] = { 0 }; */
199 gst_download_buffer_class_init (GstDownloadBufferClass * klass)
201 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
202 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
204 gobject_class->set_property = gst_download_buffer_set_property;
205 gobject_class->get_property = gst_download_buffer_get_property;
208 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
209 g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
210 "Max. amount of data to buffer (bytes, 0=disable)",
211 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
212 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
213 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
214 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
215 "Max. amount of data to buffer (in ns, 0=disable)", 0, G_MAXUINT64,
216 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
219 g_param_spec_int ("low-percent", "Low percent",
220 "Low threshold for buffering to start. Only used if use-buffering is True",
221 0, 100, DEFAULT_LOW_PERCENT,
222 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
223 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
224 g_param_spec_int ("high-percent", "High percent",
225 "High threshold for buffering to finish. Only used if use-buffering is True",
226 0, 100, DEFAULT_HIGH_PERCENT,
227 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
229 g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
230 g_param_spec_string ("temp-template", "Temporary File Template",
231 "File template to store temporary files in, should contain directory "
232 "and XXXXXX. (NULL == disabled)",
233 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
235 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
236 g_param_spec_string ("temp-location", "Temporary File Location",
237 "Location to store temporary files in (Only read this property, "
238 "use temp-template to configure the name template)",
239 NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
242 * GstDownloadBuffer:temp-remove
244 * When temp-template is set, remove the temporary file when going to READY.
246 g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
247 g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
248 "Remove the temp-location after use",
249 DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
251 /* set several parent class virtual functions */
252 gobject_class->finalize = gst_download_buffer_finalize;
254 gst_element_class_add_pad_template (gstelement_class,
255 gst_static_pad_template_get (&srctemplate));
256 gst_element_class_add_pad_template (gstelement_class,
257 gst_static_pad_template_get (&sinktemplate));
259 gst_element_class_set_static_metadata (gstelement_class, "DownloadBuffer",
260 "Generic", "Download Buffer element",
261 "Wim Taymans <wim.taymans@gmail.com>");
263 gstelement_class->change_state =
264 GST_DEBUG_FUNCPTR (gst_download_buffer_change_state);
265 gstelement_class->query =
266 GST_DEBUG_FUNCPTR (gst_download_buffer_handle_query);
270 gst_download_buffer_init (GstDownloadBuffer * dlbuf)
272 dlbuf->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
274 gst_pad_set_chain_function (dlbuf->sinkpad,
275 GST_DEBUG_FUNCPTR (gst_download_buffer_chain));
276 gst_pad_set_activatemode_function (dlbuf->sinkpad,
277 GST_DEBUG_FUNCPTR (gst_download_buffer_sink_activate_mode));
278 gst_pad_set_event_function (dlbuf->sinkpad,
279 GST_DEBUG_FUNCPTR (gst_download_buffer_handle_sink_event));
280 gst_pad_set_query_function (dlbuf->sinkpad,
281 GST_DEBUG_FUNCPTR (gst_download_buffer_handle_sink_query));
282 GST_PAD_SET_PROXY_CAPS (dlbuf->sinkpad);
283 gst_element_add_pad (GST_ELEMENT (dlbuf), dlbuf->sinkpad);
285 dlbuf->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
287 gst_pad_set_activatemode_function (dlbuf->srcpad,
288 GST_DEBUG_FUNCPTR (gst_download_buffer_src_activate_mode));
289 gst_pad_set_getrange_function (dlbuf->srcpad,
290 GST_DEBUG_FUNCPTR (gst_download_buffer_get_range));
291 gst_pad_set_event_function (dlbuf->srcpad,
292 GST_DEBUG_FUNCPTR (gst_download_buffer_handle_src_event));
293 gst_pad_set_query_function (dlbuf->srcpad,
294 GST_DEBUG_FUNCPTR (gst_download_buffer_handle_src_query));
295 GST_PAD_SET_PROXY_CAPS (dlbuf->srcpad);
296 gst_element_add_pad (GST_ELEMENT (dlbuf), dlbuf->srcpad);
299 GST_DOWNLOAD_BUFFER_CLEAR_LEVEL (dlbuf->cur_level);
300 dlbuf->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
301 dlbuf->max_level.time = DEFAULT_MAX_SIZE_TIME;
302 dlbuf->low_percent = DEFAULT_LOW_PERCENT;
303 dlbuf->high_percent = DEFAULT_HIGH_PERCENT;
305 dlbuf->srcresult = GST_FLOW_FLUSHING;
306 dlbuf->sinkresult = GST_FLOW_FLUSHING;
307 dlbuf->in_timer = g_timer_new ();
308 dlbuf->out_timer = g_timer_new ();
310 g_mutex_init (&dlbuf->qlock);
311 dlbuf->waiting_add = FALSE;
312 g_cond_init (&dlbuf->item_add);
314 dlbuf->buffering_percent = 100;
316 /* tempfile related */
317 dlbuf->temp_template = NULL;
318 dlbuf->temp_location = NULL;
319 dlbuf->temp_remove = DEFAULT_TEMP_REMOVE;
322 /* called only once, as opposed to dispose */
324 gst_download_buffer_finalize (GObject * object)
326 GstDownloadBuffer *dlbuf = GST_DOWNLOAD_BUFFER (object);
328 g_mutex_clear (&dlbuf->qlock);
329 g_cond_clear (&dlbuf->item_add);
330 g_timer_destroy (dlbuf->in_timer);
331 g_timer_destroy (dlbuf->out_timer);
333 /* temp_file path cleanup */
334 g_free (dlbuf->temp_template);
335 g_free (dlbuf->temp_location);
337 G_OBJECT_CLASS (parent_class)->finalize (object);
341 reset_rate_timer (GstDownloadBuffer * dlbuf)
344 dlbuf->bytes_out = 0;
345 dlbuf->byte_in_rate = 0.0;
346 dlbuf->byte_in_period = 0;
347 dlbuf->byte_out_rate = 0.0;
348 dlbuf->last_in_elapsed = 0.0;
349 dlbuf->last_out_elapsed = 0.0;
350 dlbuf->in_timer_started = FALSE;
351 dlbuf->out_timer_started = FALSE;
354 /* the interval in seconds to recalculate the rate */
355 #define RATE_INTERVAL 0.2
356 /* Tuning for rate estimation. We use a large window for the input rate because
357 * it should be stable when connected to a network. The output rate is less
358 * stable (the elements preroll, queues behind a demuxer fill, ...) and should
359 * therefore adapt more quickly.
360 * However, initial input rate may be subject to a burst, and should therefore
361 * initially also adapt more quickly to changes, and only later on give higher
362 * weight to previous values. */
363 #define AVG_IN(avg,val,w1,w2) ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
364 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
367 update_time_level (GstDownloadBuffer * dlbuf)
369 if (dlbuf->byte_in_rate > 0.0) {
370 dlbuf->cur_level.time =
371 dlbuf->cur_level.bytes / dlbuf->byte_in_rate * GST_SECOND;
373 GST_DEBUG ("levels: bytes %u/%u, time %" GST_TIME_FORMAT "/%" GST_TIME_FORMAT,
374 dlbuf->cur_level.bytes, dlbuf->max_level.bytes,
375 GST_TIME_ARGS (dlbuf->cur_level.time),
376 GST_TIME_ARGS (dlbuf->max_level.time));
377 /* update the buffering */
378 update_buffering (dlbuf);
382 update_levels (GstDownloadBuffer * dlbuf, guint bytes)
384 dlbuf->cur_level.bytes = bytes;
385 update_time_level (dlbuf);
389 update_in_rates (GstDownloadBuffer * dlbuf)
391 gdouble elapsed, period;
392 gdouble byte_in_rate;
394 if (!dlbuf->in_timer_started) {
395 dlbuf->in_timer_started = TRUE;
396 g_timer_start (dlbuf->in_timer);
400 elapsed = g_timer_elapsed (dlbuf->in_timer, NULL);
402 /* recalc after each interval. */
403 if (dlbuf->last_in_elapsed + RATE_INTERVAL < elapsed) {
404 period = elapsed - dlbuf->last_in_elapsed;
406 GST_DEBUG_OBJECT (dlbuf,
407 "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
408 period, dlbuf->bytes_in, dlbuf->byte_in_period);
410 byte_in_rate = dlbuf->bytes_in / period;
412 if (dlbuf->byte_in_rate == 0.0)
413 dlbuf->byte_in_rate = byte_in_rate;
415 dlbuf->byte_in_rate = AVG_IN (dlbuf->byte_in_rate, byte_in_rate,
416 (double) dlbuf->byte_in_period, period);
418 /* another data point, cap at 16 for long time running average */
419 if (dlbuf->byte_in_period < 16 * RATE_INTERVAL)
420 dlbuf->byte_in_period += period;
422 /* reset the values to calculate rate over the next interval */
423 dlbuf->last_in_elapsed = elapsed;
425 GST_DEBUG_OBJECT (dlbuf, "rates: in %f", dlbuf->byte_in_rate);
430 update_out_rates (GstDownloadBuffer * dlbuf)
432 gdouble elapsed, period;
433 gdouble byte_out_rate;
435 if (!dlbuf->out_timer_started) {
436 dlbuf->out_timer_started = TRUE;
437 g_timer_start (dlbuf->out_timer);
441 elapsed = g_timer_elapsed (dlbuf->out_timer, NULL);
443 /* recalc after each interval. */
444 if (dlbuf->last_out_elapsed + RATE_INTERVAL < elapsed) {
445 period = elapsed - dlbuf->last_out_elapsed;
447 GST_DEBUG_OBJECT (dlbuf,
448 "rates: period %f, out %" G_GUINT64_FORMAT, period, dlbuf->bytes_out);
450 byte_out_rate = dlbuf->bytes_out / period;
452 if (dlbuf->byte_out_rate == 0.0)
453 dlbuf->byte_out_rate = byte_out_rate;
455 dlbuf->byte_out_rate = AVG_OUT (dlbuf->byte_out_rate, byte_out_rate);
457 /* reset the values to calculate rate over the next interval */
458 dlbuf->last_out_elapsed = elapsed;
459 dlbuf->bytes_out = 0;
460 GST_DEBUG_OBJECT (dlbuf, "rates: out %f", dlbuf->byte_out_rate);
465 get_buffering_percent (GstDownloadBuffer * dlbuf, gboolean * is_buffering,
470 if (dlbuf->high_percent <= 0) {
474 *is_buffering = FALSE;
478 /* Ensure the variables used to calculate buffering state are up-to-date. */
479 update_in_rates (dlbuf);
480 update_out_rates (dlbuf);
482 /* figure out the percent we are filled, we take the max of all formats. */
483 if (dlbuf->max_level.bytes > 0) {
484 if (dlbuf->cur_level.bytes >= dlbuf->max_level.bytes)
487 perc = dlbuf->cur_level.bytes * 100 / dlbuf->max_level.bytes;
491 if (dlbuf->max_level.time > 0) {
492 if (dlbuf->cur_level.time >= dlbuf->max_level.time)
495 perc = MAX (perc, dlbuf->cur_level.time * 100 / dlbuf->max_level.time);
497 perc = MAX (0, perc);
500 *is_buffering = dlbuf->is_buffering;
502 /* scale to high percent so that it becomes the 100% mark */
503 perc = perc * 100 / dlbuf->high_percent;
511 GST_DEBUG_OBJECT (dlbuf, "buffering %d, percent %d", dlbuf->is_buffering,
518 get_buffering_stats (GstDownloadBuffer * dlbuf, gint percent,
519 GstBufferingMode * mode, gint * avg_in, gint * avg_out,
520 gint64 * buffering_left)
523 *mode = GST_BUFFERING_DOWNLOAD;
526 *avg_in = dlbuf->byte_in_rate;
528 *avg_out = dlbuf->byte_out_rate;
530 if (buffering_left) {
533 *buffering_left = (percent == 100 ? 0 : -1);
535 max = dlbuf->max_level.time;
536 cur = dlbuf->cur_level.time;
538 if (percent != 100 && max > cur)
539 *buffering_left = (max - cur) / 1000000;
544 update_buffering (GstDownloadBuffer * dlbuf)
547 gboolean post = FALSE;
549 if (!get_buffering_percent (dlbuf, NULL, &percent))
552 if (dlbuf->is_buffering) {
554 /* if we were buffering see if we reached the high watermark */
555 if (percent >= dlbuf->high_percent)
556 dlbuf->is_buffering = FALSE;
558 /* we were not buffering, check if we need to start buffering if we drop
559 * below the low threshold */
560 if (percent < dlbuf->low_percent) {
561 dlbuf->is_buffering = TRUE;
567 if (percent == dlbuf->buffering_percent)
570 dlbuf->buffering_percent = percent;
575 GstBufferingMode mode;
576 gint avg_in, avg_out;
577 gint64 buffering_left;
579 get_buffering_stats (dlbuf, percent, &mode, &avg_in, &avg_out,
582 message = gst_message_new_buffering (GST_OBJECT_CAST (dlbuf),
584 gst_message_set_buffering_stats (message, mode,
585 avg_in, avg_out, buffering_left);
587 gst_element_post_message (GST_ELEMENT_CAST (dlbuf), message);
592 perform_seek_to_offset (GstDownloadBuffer * dlbuf, guint64 offset)
600 /* until we receive the FLUSH_STOP from this seek, we skip data */
601 dlbuf->seeking = TRUE;
602 dlbuf->write_pos = offset;
603 dlbuf->filling = FALSE;
604 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
606 GST_DEBUG_OBJECT (dlbuf, "Seeking to %" G_GUINT64_FORMAT, offset);
609 gst_event_new_seek (1.0, GST_FORMAT_BYTES,
610 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
611 GST_SEEK_TYPE_NONE, -1);
613 res = gst_pad_push_event (dlbuf->sinkpad, event);
614 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
619 /* get the threshold for when we decide to seek rather than wait */
621 get_seek_threshold (GstDownloadBuffer * dlbuf)
625 /* FIXME, find a good threshold based on the incoming rate. */
626 threshold = 1024 * 512;
632 gst_download_buffer_update_upstream_size (GstDownloadBuffer * dlbuf)
634 gint64 upstream_size = 0;
636 if (gst_pad_peer_query_duration (dlbuf->sinkpad, GST_FORMAT_BYTES,
638 GST_INFO_OBJECT (dlbuf, "upstream size: %" G_GINT64_FORMAT, upstream_size);
639 dlbuf->upstream_size = upstream_size;
643 /* called with DOWNLOAD_BUFFER_MUTEX */
645 gst_download_buffer_wait_for_data (GstDownloadBuffer * dlbuf, guint64 offset,
652 GST_DEBUG_OBJECT (dlbuf, "wait for %" G_GUINT64_FORMAT ", length %u",
655 wanted = offset + length;
657 /* pause the timer while we wait. The fact that we are waiting does not mean
658 * the byterate on the output pad is lower */
659 if ((started = dlbuf->out_timer_started))
660 g_timer_stop (dlbuf->out_timer);
662 /* check range before us */
663 if (gst_sparse_file_get_range_before (dlbuf->file, offset, &start, &stop)) {
664 GST_DEBUG_OBJECT (dlbuf,
665 "range before %" G_GSIZE_FORMAT " - %" G_GSIZE_FORMAT, start, stop);
666 if (start <= offset && offset < stop) {
667 GST_DEBUG_OBJECT (dlbuf, "we have the offset");
668 /* we have the range, continue it */
671 guint64 threshold, dist;
673 /* there is a range before us, check how far away it is */
674 threshold = get_seek_threshold (dlbuf);
675 dist = offset - stop;
677 if (dist <= threshold) {
678 GST_DEBUG_OBJECT (dlbuf, "not too far");
679 /* not far away, continue it */
685 if (dlbuf->write_pos != offset)
686 perform_seek_to_offset (dlbuf, offset);
688 dlbuf->filling = TRUE;
689 if (dlbuf->write_pos > dlbuf->read_pos)
690 update_levels (dlbuf, dlbuf->write_pos - dlbuf->read_pos);
692 update_levels (dlbuf, 0);
694 /* now wait for more data */
695 GST_DEBUG_OBJECT (dlbuf, "waiting for more data");
696 GST_DOWNLOAD_BUFFER_WAIT_ADD_CHECK (dlbuf, dlbuf->srcresult, wanted,
698 GST_DEBUG_OBJECT (dlbuf, "got more data");
700 /* and continue if we were running before */
702 g_timer_continue (dlbuf->out_timer);
708 GST_DEBUG_OBJECT (dlbuf, "we are flushing");
709 return GST_FLOW_FLUSHING;
714 check_upstream_size (GstDownloadBuffer * dlbuf, gsize offset, guint * length)
716 gsize stop = offset + *length;
717 /* catch any reads beyond the size of the file here to make sure cache
718 * doesn't send seek events beyond the size of the file upstream, since
719 * that would confuse elements such as souphttpsrc and/or http servers.
720 * Demuxers often just loop until EOS at the end of the file to figure out
721 * when they've read all the end-headers or index chunks. */
722 if (G_UNLIKELY (dlbuf->upstream_size == -1 || stop >= dlbuf->upstream_size)) {
723 gst_download_buffer_update_upstream_size (dlbuf);
726 if (dlbuf->upstream_size != -1) {
727 if (offset >= dlbuf->upstream_size)
730 if (G_UNLIKELY (stop > dlbuf->upstream_size)) {
731 *length = dlbuf->upstream_size - offset;
732 GST_DEBUG_OBJECT (dlbuf, "adjusting length downto %u", *length);
738 /* called with DOWNLOAD_BUFFER_MUTEX */
740 gst_download_buffer_read_buffer (GstDownloadBuffer * dlbuf, guint64 offset,
741 guint length, GstBuffer ** buffer)
745 GstFlowReturn ret = GST_FLOW_OK;
746 gsize res, remaining;
747 GError *error = NULL;
749 length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
750 offset = (offset == -1) ? dlbuf->read_pos : offset;
752 if (!check_upstream_size (dlbuf, offset, &length))
755 /* allocate the output buffer of the requested size */
757 buf = gst_buffer_new_allocate (NULL, length, NULL);
761 gst_buffer_map (buf, &info, GST_MAP_WRITE);
763 GST_DEBUG_OBJECT (dlbuf, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
766 dlbuf->read_pos = offset;
770 gst_sparse_file_read (dlbuf->file, offset, info.data, length,
772 if (G_UNLIKELY (res == 0)) {
773 switch (error->code) {
774 case G_IO_ERROR_WOULD_BLOCK:
775 /* we don't have the requested data in the file, decide what to
777 ret = gst_download_buffer_wait_for_data (dlbuf, offset, length);
778 if (ret != GST_FLOW_OK)
784 g_clear_error (&error);
788 gst_buffer_unmap (buf, &info);
789 gst_buffer_resize (buf, 0, res);
791 dlbuf->bytes_out += res;
792 dlbuf->read_pos += res;
794 GST_DEBUG_OBJECT (dlbuf,
795 "Read %" G_GSIZE_FORMAT " bytes, remaining %" G_GSIZE_FORMAT, res,
798 if (dlbuf->read_pos + remaining == dlbuf->upstream_size)
799 update_levels (dlbuf, dlbuf->max_level.bytes);
801 update_levels (dlbuf, remaining);
803 GST_BUFFER_OFFSET (buf) = offset;
804 GST_BUFFER_OFFSET_END (buf) = offset + res;
813 GST_DEBUG_OBJECT (dlbuf, "EOS hit");
818 GST_DEBUG_OBJECT (dlbuf, "we are flushing");
819 gst_buffer_unmap (buf, &info);
821 gst_buffer_unref (buf);
822 return GST_FLOW_FLUSHING;
826 GST_DEBUG_OBJECT (dlbuf, "we have a read error: %s", error->message);
827 g_clear_error (&error);
828 gst_buffer_unmap (buf, &info);
830 gst_buffer_unref (buf);
835 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
836 * the temp filename. */
838 gst_download_buffer_open_temp_location_file (GstDownloadBuffer * dlbuf)
846 GST_DEBUG_OBJECT (dlbuf, "opening temp file %s", dlbuf->temp_template);
848 /* If temp_template was set, allocate a filename and open that filen */
851 if (dlbuf->temp_template == NULL)
854 /* make copy of the template, we don't want to change this */
855 name = g_strdup (dlbuf->temp_template);
856 fd = g_mkstemp (name);
860 /* open the file for update/writing */
861 dlbuf->file = gst_sparse_file_new ();
862 /* error creating file */
863 if (!gst_sparse_file_set_fd (dlbuf->file, fd))
866 g_free (dlbuf->temp_location);
867 dlbuf->temp_location = name;
870 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
872 /* we can't emit the notify with the lock */
873 g_object_notify (G_OBJECT (dlbuf), "temp-location");
875 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
877 GST_DEBUG_OBJECT (dlbuf, "opened temp file %s", dlbuf->temp_template);
884 GST_DEBUG_OBJECT (dlbuf, "temp file was already open");
889 GST_ELEMENT_ERROR (dlbuf, RESOURCE, NOT_FOUND,
890 (_("No Temp directory specified.")), (NULL));
895 GST_ELEMENT_ERROR (dlbuf, RESOURCE, OPEN_READ,
896 (_("Could not create temp file \"%s\"."), dlbuf->temp_template),
903 GST_ELEMENT_ERROR (dlbuf, RESOURCE, OPEN_READ,
904 (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
913 gst_download_buffer_close_temp_location_file (GstDownloadBuffer * dlbuf)
916 if (dlbuf->file == NULL)
919 GST_DEBUG_OBJECT (dlbuf, "closing sparse file");
921 if (dlbuf->temp_remove) {
922 if (remove (dlbuf->temp_location) < 0) {
923 GST_WARNING_OBJECT (dlbuf, "Failed to remove temporary file %s: %s",
924 dlbuf->temp_location, g_strerror (errno));
927 gst_sparse_file_free (dlbuf->file);
928 close (dlbuf->temp_fd);
933 gst_download_buffer_flush_temp_file (GstDownloadBuffer * dlbuf)
935 if (dlbuf->file == NULL)
938 GST_DEBUG_OBJECT (dlbuf, "flushing temp file");
940 gst_sparse_file_clear (dlbuf->file);
944 gst_download_buffer_locked_flush (GstDownloadBuffer * dlbuf, gboolean full,
948 gst_download_buffer_flush_temp_file (dlbuf);
949 GST_DOWNLOAD_BUFFER_CLEAR_LEVEL (dlbuf->cur_level);
950 gst_event_replace (&dlbuf->stream_start_event, NULL);
951 gst_event_replace (&dlbuf->segment_event, NULL);
955 gst_download_buffer_handle_sink_event (GstPad * pad, GstObject * parent,
959 GstDownloadBuffer *dlbuf;
961 dlbuf = GST_DOWNLOAD_BUFFER (parent);
963 switch (GST_EVENT_TYPE (event)) {
964 case GST_EVENT_FLUSH_START:
966 GST_LOG_OBJECT (dlbuf, "received flush start event");
967 if (GST_PAD_MODE (dlbuf->srcpad) == GST_PAD_MODE_PUSH) {
969 ret = gst_pad_push_event (dlbuf->srcpad, event);
971 /* now unblock the chain function */
972 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
973 dlbuf->srcresult = GST_FLOW_FLUSHING;
974 dlbuf->sinkresult = GST_FLOW_FLUSHING;
975 /* unblock the loop and chain functions */
976 GST_DOWNLOAD_BUFFER_SIGNAL_ADD (dlbuf, -1);
977 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
979 /* make sure it pauses, this should happen since we sent
980 * flush_start downstream. */
981 gst_pad_pause_task (dlbuf->srcpad);
982 GST_LOG_OBJECT (dlbuf, "loop stopped");
984 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
985 /* flush the sink pad */
986 dlbuf->sinkresult = GST_FLOW_FLUSHING;
987 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
989 gst_event_unref (event);
993 case GST_EVENT_FLUSH_STOP:
995 GST_LOG_OBJECT (dlbuf, "received flush stop event");
997 if (GST_PAD_MODE (dlbuf->srcpad) == GST_PAD_MODE_PUSH) {
999 ret = gst_pad_push_event (dlbuf->srcpad, event);
1001 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1002 gst_download_buffer_locked_flush (dlbuf, FALSE, TRUE);
1003 dlbuf->srcresult = GST_FLOW_OK;
1004 dlbuf->sinkresult = GST_FLOW_OK;
1005 dlbuf->unexpected = FALSE;
1006 dlbuf->seeking = FALSE;
1007 /* reset rate counters */
1008 reset_rate_timer (dlbuf);
1009 gst_pad_start_task (dlbuf->srcpad,
1010 (GstTaskFunction) gst_download_buffer_loop, dlbuf->srcpad, NULL);
1011 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1013 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1014 dlbuf->unexpected = FALSE;
1015 dlbuf->sinkresult = GST_FLOW_OK;
1016 dlbuf->seeking = FALSE;
1017 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1019 gst_event_unref (event);
1024 if (GST_EVENT_IS_SERIALIZED (event)) {
1025 /* serialized events go in the buffer */
1026 GST_DOWNLOAD_BUFFER_MUTEX_LOCK_CHECK (dlbuf, dlbuf->sinkresult,
1028 switch (GST_EVENT_TYPE (event)) {
1030 GST_DEBUG_OBJECT (dlbuf, "we have EOS");
1031 /* Zero the thresholds, this makes sure the dlbuf is completely
1032 * filled and we can read all data from the dlbuf. */
1033 /* update the buffering status */
1034 update_levels (dlbuf, dlbuf->max_level.bytes);
1036 case GST_EVENT_SEGMENT:
1037 gst_event_replace (&dlbuf->segment_event, event);
1038 /* a new segment allows us to accept more buffers if we got EOS
1039 * from downstream */
1040 dlbuf->unexpected = FALSE;
1042 case GST_EVENT_STREAM_START:
1043 gst_event_replace (&dlbuf->stream_start_event, event);
1048 gst_event_unref (event);
1049 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1051 /* non-serialized events are passed upstream. */
1052 ret = gst_pad_push_event (dlbuf->srcpad, event);
1061 GST_DEBUG_OBJECT (dlbuf, "refusing event, we are flushing");
1062 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1063 gst_event_unref (event);
1069 gst_download_buffer_handle_sink_query (GstPad * pad, GstObject * parent,
1072 GstDownloadBuffer *dlbuf;
1075 dlbuf = GST_DOWNLOAD_BUFFER (parent);
1077 switch (GST_QUERY_TYPE (query)) {
1079 if (GST_QUERY_IS_SERIALIZED (query)) {
1080 GST_LOG_OBJECT (dlbuf, "received query %p", query);
1081 GST_DEBUG_OBJECT (dlbuf, "refusing query, we are not using the dlbuf");
1084 res = gst_pad_query_default (pad, parent, query);
1091 static GstFlowReturn
1092 gst_download_buffer_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1094 GstDownloadBuffer *dlbuf;
1097 gsize res, available;
1098 GError *error = NULL;
1100 dlbuf = GST_DOWNLOAD_BUFFER (parent);
1102 GST_LOG_OBJECT (dlbuf, "received buffer %p of "
1103 "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
1104 GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
1105 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
1106 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
1108 /* we have to lock the dlbuf since we span threads */
1109 GST_DOWNLOAD_BUFFER_MUTEX_LOCK_CHECK (dlbuf, dlbuf->sinkresult, out_flushing);
1110 /* when we received unexpected from downstream, refuse more buffers */
1111 if (dlbuf->unexpected)
1114 /* while we didn't receive the newsegment, we're seeking and we skip data */
1118 /* put buffer in dlbuf now */
1119 offset = dlbuf->write_pos;
1122 if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1123 GST_BUFFER_OFFSET (buffer) != offset) {
1124 GST_WARNING_OBJECT (dlbuf, "buffer offset does not match current writing "
1125 "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1126 GST_BUFFER_OFFSET (buffer), offset);
1129 gst_buffer_map (buffer, &info, GST_MAP_READ);
1131 GST_DEBUG_OBJECT (dlbuf, "Writing %" G_GSIZE_FORMAT " bytes to %"
1132 G_GUINT64_FORMAT, info.size, offset);
1135 gst_sparse_file_write (dlbuf->file, offset, info.data, info.size,
1136 &available, &error);
1140 dlbuf->write_pos = offset + info.size;
1141 dlbuf->bytes_in += info.size;
1143 GST_DOWNLOAD_BUFFER_SIGNAL_ADD (dlbuf, dlbuf->write_pos + available);
1145 /* we hit the end, see what to do */
1146 if (dlbuf->write_pos + available == dlbuf->upstream_size) {
1149 /* we have everything up to the end, find a region to fill */
1150 if (gst_sparse_file_get_range_after (dlbuf->file, 0, &start, &stop)) {
1151 if (stop < dlbuf->upstream_size) {
1152 /* a hole to fill, seek to its end */
1153 perform_seek_to_offset (dlbuf, stop);
1155 /* we filled all the holes, post a message */
1156 dlbuf->filling = FALSE;
1157 update_levels (dlbuf, dlbuf->max_level.bytes);
1158 gst_element_post_message (GST_ELEMENT_CAST (dlbuf),
1159 gst_message_new_element (GST_OBJECT_CAST (dlbuf),
1160 gst_structure_new ("GstCacheDownloadComplete",
1161 "location", G_TYPE_STRING, dlbuf->temp_location, NULL)));
1165 /* see if we need to skip this region or just read it again. The idea
1166 * is that when the region is not big, we want to avoid a seek and just
1168 guint64 threshold = get_seek_threshold (dlbuf);
1170 if (available > threshold) {
1171 /* further than threshold, it's better to skip than to reread */
1172 perform_seek_to_offset (dlbuf, dlbuf->write_pos + available);
1175 if (dlbuf->filling) {
1176 if (dlbuf->write_pos > dlbuf->read_pos)
1177 update_levels (dlbuf, dlbuf->write_pos - dlbuf->read_pos);
1179 update_levels (dlbuf, 0);
1182 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1184 gst_buffer_unmap (buffer, &info);
1185 gst_buffer_unref (buffer);
1192 GstFlowReturn ret = dlbuf->sinkresult;
1193 GST_LOG_OBJECT (dlbuf,
1194 "exit because task paused, reason: %s", gst_flow_get_name (ret));
1195 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1196 gst_buffer_unref (buffer);
1201 GST_LOG_OBJECT (dlbuf, "exit because we received EOS");
1202 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1203 gst_buffer_unref (buffer);
1204 return GST_FLOW_EOS;
1208 GST_LOG_OBJECT (dlbuf, "exit because we are seeking");
1209 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1210 gst_buffer_unref (buffer);
1215 gst_buffer_unmap (buffer, &info);
1216 gst_buffer_unref (buffer);
1217 GST_ELEMENT_ERROR (dlbuf, RESOURCE, WRITE,
1218 (_("Error while writing to download file.")), ("%s", error->message));
1219 g_clear_error (&error);
1220 return GST_FLOW_ERROR;
1224 /* called repeatedly with @pad as the source pad. This function should push out
1225 * data to the peer element. */
1227 gst_download_buffer_loop (GstPad * pad)
1229 GstDownloadBuffer *dlbuf;
1231 GstBuffer *buffer = NULL;
1233 dlbuf = GST_DOWNLOAD_BUFFER (GST_PAD_PARENT (pad));
1235 /* have to lock for thread-safety */
1236 GST_DOWNLOAD_BUFFER_MUTEX_LOCK_CHECK (dlbuf, dlbuf->srcresult, out_flushing);
1238 if (dlbuf->stream_start_event != NULL) {
1239 gst_pad_push_event (dlbuf->srcpad, dlbuf->stream_start_event);
1240 dlbuf->stream_start_event = NULL;
1242 if (dlbuf->segment_event != NULL) {
1243 gst_pad_push_event (dlbuf->srcpad, dlbuf->segment_event);
1244 dlbuf->segment_event = NULL;
1247 ret = gst_download_buffer_read_buffer (dlbuf, -1, -1, &buffer);
1248 if (ret != GST_FLOW_OK)
1251 g_atomic_int_set (&dlbuf->downstream_may_block, 1);
1252 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1254 ret = gst_pad_push (dlbuf->srcpad, buffer);
1255 g_atomic_int_set (&dlbuf->downstream_may_block, 0);
1257 /* need to check for srcresult here as well */
1258 GST_DOWNLOAD_BUFFER_MUTEX_LOCK_CHECK (dlbuf, dlbuf->srcresult, out_flushing);
1259 dlbuf->srcresult = ret;
1260 dlbuf->sinkresult = ret;
1261 if (ret != GST_FLOW_OK)
1263 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1270 GstFlowReturn ret = dlbuf->srcresult;
1272 gst_pad_pause_task (dlbuf->srcpad);
1273 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1274 GST_LOG_OBJECT (dlbuf, "pause task, reason: %s", gst_flow_get_name (ret));
1275 /* let app know about us giving up if upstream is not expected to do so */
1276 if (ret == GST_FLOW_EOS) {
1277 /* FIXME perform EOS logic, this is really a basesrc operating on a
1279 gst_pad_push_event (dlbuf->srcpad, gst_event_new_eos ());
1280 } else if ((ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
1281 GST_ELEMENT_ERROR (dlbuf, STREAM, FAILED,
1282 (_("Internal data flow error.")),
1283 ("streaming task paused, reason %s (%d)",
1284 gst_flow_get_name (ret), ret));
1285 gst_pad_push_event (dlbuf->srcpad, gst_event_new_eos ());
1292 gst_download_buffer_handle_src_event (GstPad * pad, GstObject * parent,
1295 gboolean res = TRUE;
1296 GstDownloadBuffer *dlbuf = GST_DOWNLOAD_BUFFER (parent);
1298 #ifndef GST_DISABLE_GST_DEBUG
1299 GST_DEBUG_OBJECT (dlbuf, "got event %p (%s)",
1300 event, GST_EVENT_TYPE_NAME (event));
1303 switch (GST_EVENT_TYPE (event)) {
1304 case GST_EVENT_FLUSH_START:
1305 /* now unblock the getrange function */
1306 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1307 GST_DEBUG_OBJECT (dlbuf, "flushing");
1308 dlbuf->srcresult = GST_FLOW_FLUSHING;
1309 GST_DOWNLOAD_BUFFER_SIGNAL_ADD (dlbuf, -1);
1310 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1312 /* when using a temp file, we eat the event */
1314 gst_event_unref (event);
1316 case GST_EVENT_FLUSH_STOP:
1317 /* now unblock the getrange function */
1318 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1319 dlbuf->srcresult = GST_FLOW_OK;
1320 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1322 /* when using a temp file, we eat the event */
1324 gst_event_unref (event);
1326 case GST_EVENT_RECONFIGURE:
1327 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1328 /* assume downstream is linked now and try to push again */
1329 if (dlbuf->srcresult == GST_FLOW_NOT_LINKED) {
1330 dlbuf->srcresult = GST_FLOW_OK;
1331 dlbuf->sinkresult = GST_FLOW_OK;
1332 if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
1333 gst_pad_start_task (pad, (GstTaskFunction) gst_download_buffer_loop,
1337 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1339 res = gst_pad_push_event (dlbuf->sinkpad, event);
1342 res = gst_pad_push_event (dlbuf->sinkpad, event);
1350 gst_download_buffer_handle_src_query (GstPad * pad, GstObject * parent,
1353 GstDownloadBuffer *dlbuf;
1355 dlbuf = GST_DOWNLOAD_BUFFER (parent);
1357 switch (GST_QUERY_TYPE (query)) {
1358 case GST_QUERY_POSITION:
1363 if (!gst_pad_peer_query (dlbuf->sinkpad, query))
1366 /* get peer position */
1367 gst_query_parse_position (query, &format, &peer_pos);
1369 /* FIXME: this code assumes that there's no discont in the dlbuf */
1371 case GST_FORMAT_BYTES:
1372 peer_pos -= dlbuf->cur_level.bytes;
1374 case GST_FORMAT_TIME:
1375 peer_pos -= dlbuf->cur_level.time;
1378 GST_WARNING_OBJECT (dlbuf, "dropping query in %s format, don't "
1379 "know how to adjust value", gst_format_get_name (format));
1382 /* set updated position */
1383 gst_query_set_position (query, format, peer_pos);
1386 case GST_QUERY_DURATION:
1388 GST_DEBUG_OBJECT (dlbuf, "doing peer query");
1390 if (!gst_pad_peer_query (dlbuf->sinkpad, query))
1393 GST_DEBUG_OBJECT (dlbuf, "peer query success");
1396 case GST_QUERY_BUFFERING:
1399 gboolean is_buffering;
1400 GstBufferingMode mode;
1401 gint avg_in, avg_out;
1402 gint64 buffering_left;
1404 GST_DEBUG_OBJECT (dlbuf, "query buffering");
1406 get_buffering_percent (dlbuf, &is_buffering, &percent);
1407 gst_query_set_buffering_percent (query, is_buffering, percent);
1409 get_buffering_stats (dlbuf, percent, &mode, &avg_in, &avg_out,
1411 gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
1415 /* add ranges for download and ringbuffer buffering */
1419 gint64 estimated_total;
1421 gsize offset, range_start, range_stop;
1423 write_pos = dlbuf->write_pos;
1425 /* get duration of upstream in bytes */
1426 gst_download_buffer_update_upstream_size (dlbuf);
1427 duration = dlbuf->upstream_size;
1429 GST_DEBUG_OBJECT (dlbuf, "percent %d, duration %" G_GINT64_FORMAT
1430 ", writing %" G_GINT64_FORMAT, percent, duration, write_pos);
1432 /* calculate remaining and total download time */
1433 if (duration > write_pos && avg_in > 0.0)
1434 estimated_total = ((duration - write_pos) * 1000) / avg_in;
1436 estimated_total = -1;
1438 GST_DEBUG_OBJECT (dlbuf, "estimated-total %" G_GINT64_FORMAT,
1441 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1444 case GST_FORMAT_PERCENT:
1446 /* get our available data relative to the duration */
1449 gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, write_pos,
1454 case GST_FORMAT_BYTES:
1464 gst_query_set_buffering_range (query, format, start, stop,
1467 /* fill out the buffered ranges */
1469 while (gst_sparse_file_get_range_after (dlbuf->file, offset,
1470 &range_start, &range_stop)) {
1471 offset = range_stop;
1474 case GST_FORMAT_PERCENT:
1475 if (duration == -1) {
1479 range_start = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
1480 range_start, duration);
1481 range_stop = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
1482 range_stop, duration);
1485 case GST_FORMAT_BYTES:
1492 if (range_start == range_stop)
1494 GST_DEBUG_OBJECT (dlbuf,
1495 "range starting at %" G_GINT64_FORMAT " and finishing at %"
1496 G_GINT64_FORMAT, range_start, range_stop);
1497 gst_query_add_buffering_range (query, range_start, range_stop);
1502 case GST_QUERY_SCHEDULING:
1504 GstSchedulingFlags flags = 0;
1506 if (!gst_pad_peer_query (dlbuf->sinkpad, query))
1509 gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
1511 /* we can operate in pull mode when we are using a tempfile */
1512 flags |= GST_SCHEDULING_FLAG_SEEKABLE;
1513 gst_query_set_scheduling (query, flags, 0, -1, 0);
1514 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1515 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1519 /* peer handled other queries */
1520 if (!gst_pad_query_default (pad, parent, query))
1530 GST_DEBUG_OBJECT (dlbuf, "failed peer query");
1536 gst_download_buffer_handle_query (GstElement * element, GstQuery * query)
1538 GstDownloadBuffer *dlbuf = GST_DOWNLOAD_BUFFER (element);
1540 /* simply forward to the srcpad query function */
1541 return gst_download_buffer_handle_src_query (dlbuf->srcpad,
1542 GST_OBJECT_CAST (element), query);
1545 static GstFlowReturn
1546 gst_download_buffer_get_range (GstPad * pad, GstObject * parent, guint64 offset,
1547 guint length, GstBuffer ** buffer)
1549 GstDownloadBuffer *dlbuf;
1552 dlbuf = GST_DOWNLOAD_BUFFER_CAST (parent);
1554 GST_DOWNLOAD_BUFFER_MUTEX_LOCK_CHECK (dlbuf, dlbuf->srcresult, out_flushing);
1555 /* FIXME - function will block when the range is not yet available */
1556 ret = gst_download_buffer_read_buffer (dlbuf, offset, length, buffer);
1557 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1564 ret = dlbuf->srcresult;
1566 GST_DEBUG_OBJECT (dlbuf, "we are flushing");
1567 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1572 /* sink currently only operates in push mode */
1574 gst_download_buffer_sink_activate_mode (GstPad * pad, GstObject * parent,
1575 GstPadMode mode, gboolean active)
1578 GstDownloadBuffer *dlbuf;
1580 dlbuf = GST_DOWNLOAD_BUFFER (parent);
1583 case GST_PAD_MODE_PUSH:
1585 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1586 GST_DEBUG_OBJECT (dlbuf, "activating push mode");
1587 dlbuf->srcresult = GST_FLOW_OK;
1588 dlbuf->sinkresult = GST_FLOW_OK;
1589 dlbuf->unexpected = FALSE;
1590 reset_rate_timer (dlbuf);
1591 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1593 /* unblock chain function */
1594 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1595 GST_DEBUG_OBJECT (dlbuf, "deactivating push mode");
1596 dlbuf->srcresult = GST_FLOW_FLUSHING;
1597 dlbuf->sinkresult = GST_FLOW_FLUSHING;
1598 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1600 /* wait until it is unblocked and clean up */
1601 GST_PAD_STREAM_LOCK (pad);
1602 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1603 gst_download_buffer_locked_flush (dlbuf, TRUE, FALSE);
1604 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1605 GST_PAD_STREAM_UNLOCK (pad);
1616 /* src operating in push mode, we start a task on the source pad that pushes out
1617 * buffers from the dlbuf */
1619 gst_download_buffer_src_activate_push (GstPad * pad, GstObject * parent,
1622 gboolean result = FALSE;
1623 GstDownloadBuffer *dlbuf;
1625 dlbuf = GST_DOWNLOAD_BUFFER (parent);
1628 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1629 GST_DEBUG_OBJECT (dlbuf, "activating push mode");
1630 dlbuf->srcresult = GST_FLOW_OK;
1631 dlbuf->sinkresult = GST_FLOW_OK;
1632 dlbuf->unexpected = FALSE;
1634 gst_pad_start_task (pad, (GstTaskFunction) gst_download_buffer_loop,
1636 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1638 /* unblock loop function */
1639 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1640 GST_DEBUG_OBJECT (dlbuf, "deactivating push mode");
1641 dlbuf->srcresult = GST_FLOW_FLUSHING;
1642 dlbuf->sinkresult = GST_FLOW_FLUSHING;
1643 /* the item add signal will unblock */
1644 GST_DOWNLOAD_BUFFER_SIGNAL_ADD (dlbuf, -1);
1645 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1647 /* step 2, make sure streaming finishes */
1648 result = gst_pad_stop_task (pad);
1654 /* pull mode, downstream will call our getrange function */
1656 gst_download_buffer_src_activate_pull (GstPad * pad, GstObject * parent,
1660 GstDownloadBuffer *dlbuf;
1662 dlbuf = GST_DOWNLOAD_BUFFER (parent);
1665 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1666 /* open the temp file now */
1667 result = gst_download_buffer_open_temp_location_file (dlbuf);
1668 GST_DEBUG_OBJECT (dlbuf, "activating pull mode");
1669 dlbuf->srcresult = GST_FLOW_OK;
1670 dlbuf->sinkresult = GST_FLOW_OK;
1671 dlbuf->unexpected = FALSE;
1672 dlbuf->upstream_size = 0;
1673 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1675 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1676 GST_DEBUG_OBJECT (dlbuf, "deactivating pull mode");
1677 dlbuf->srcresult = GST_FLOW_FLUSHING;
1678 dlbuf->sinkresult = GST_FLOW_FLUSHING;
1679 /* this will unlock getrange */
1680 GST_DOWNLOAD_BUFFER_SIGNAL_ADD (dlbuf, -1);
1682 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1689 gst_download_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1690 GstPadMode mode, gboolean active)
1695 case GST_PAD_MODE_PULL:
1696 res = gst_download_buffer_src_activate_pull (pad, parent, active);
1698 case GST_PAD_MODE_PUSH:
1699 res = gst_download_buffer_src_activate_push (pad, parent, active);
1702 GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
1709 static GstStateChangeReturn
1710 gst_download_buffer_change_state (GstElement * element,
1711 GstStateChange transition)
1713 GstDownloadBuffer *dlbuf;
1714 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1716 dlbuf = GST_DOWNLOAD_BUFFER (element);
1718 switch (transition) {
1719 case GST_STATE_CHANGE_NULL_TO_READY:
1721 case GST_STATE_CHANGE_READY_TO_PAUSED:
1722 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1723 if (!gst_download_buffer_open_temp_location_file (dlbuf))
1724 ret = GST_STATE_CHANGE_FAILURE;
1725 gst_event_replace (&dlbuf->stream_start_event, NULL);
1726 gst_event_replace (&dlbuf->segment_event, NULL);
1727 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1729 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1735 if (ret == GST_STATE_CHANGE_FAILURE)
1738 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1740 if (ret == GST_STATE_CHANGE_FAILURE)
1743 switch (transition) {
1744 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1746 case GST_STATE_CHANGE_PAUSED_TO_READY:
1747 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1748 gst_download_buffer_close_temp_location_file (dlbuf);
1749 gst_event_replace (&dlbuf->stream_start_event, NULL);
1750 gst_event_replace (&dlbuf->segment_event, NULL);
1751 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1753 case GST_STATE_CHANGE_READY_TO_NULL:
1762 #define CAPACITY_CHANGE(elem) \
1763 update_buffering (elem);
1766 gst_download_buffer_set_temp_template (GstDownloadBuffer * dlbuf,
1767 const gchar * template)
1771 /* the element must be stopped in order to do this */
1772 GST_OBJECT_LOCK (dlbuf);
1773 state = GST_STATE (dlbuf);
1774 if (state != GST_STATE_READY && state != GST_STATE_NULL)
1776 GST_OBJECT_UNLOCK (dlbuf);
1778 /* set new location */
1779 g_free (dlbuf->temp_template);
1780 dlbuf->temp_template = g_strdup (template);
1787 GST_WARNING_OBJECT (dlbuf, "setting temp-template property in wrong state");
1788 GST_OBJECT_UNLOCK (dlbuf);
1793 gst_download_buffer_set_property (GObject * object,
1794 guint prop_id, const GValue * value, GParamSpec * pspec)
1796 GstDownloadBuffer *dlbuf = GST_DOWNLOAD_BUFFER (object);
1798 /* someone could change levels here, and since this
1799 * affects the get/put funcs, we need to lock for safety. */
1800 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1803 case PROP_MAX_SIZE_BYTES:
1804 dlbuf->max_level.bytes = g_value_get_uint (value);
1805 CAPACITY_CHANGE (dlbuf);
1807 case PROP_MAX_SIZE_TIME:
1808 dlbuf->max_level.time = g_value_get_uint64 (value);
1809 CAPACITY_CHANGE (dlbuf);
1811 case PROP_LOW_PERCENT:
1812 dlbuf->low_percent = g_value_get_int (value);
1814 case PROP_HIGH_PERCENT:
1815 dlbuf->high_percent = g_value_get_int (value);
1817 case PROP_TEMP_TEMPLATE:
1818 gst_download_buffer_set_temp_template (dlbuf, g_value_get_string (value));
1820 case PROP_TEMP_REMOVE:
1821 dlbuf->temp_remove = g_value_get_boolean (value);
1824 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1828 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
1832 gst_download_buffer_get_property (GObject * object,
1833 guint prop_id, GValue * value, GParamSpec * pspec)
1835 GstDownloadBuffer *dlbuf = GST_DOWNLOAD_BUFFER (object);
1837 GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
1840 case PROP_MAX_SIZE_BYTES:
1841 g_value_set_uint (value, dlbuf->max_level.bytes);
1843 case PROP_MAX_SIZE_TIME:
1844 g_value_set_uint64 (value, dlbuf->max_level.time);
1846 case PROP_LOW_PERCENT:
1847 g_value_set_int (value, dlbuf->low_percent);
1849 case PROP_HIGH_PERCENT:
1850 g_value_set_int (value, dlbuf->high_percent);
1852 case PROP_TEMP_TEMPLATE:
1853 g_value_set_string (value, dlbuf->temp_template);
1855 case PROP_TEMP_LOCATION:
1856 g_value_set_string (value, dlbuf->temp_location);
1858 case PROP_TEMP_REMOVE:
1859 g_value_set_boolean (value, dlbuf->temp_remove);
1862 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1866 GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);