/**
* SECTION:element-downloadbuffer
+ * @title: downloadbuffer
*
* The downloadbuffer element provides on-disk buffering and caching of, typically,
* a network file. temp-template should be set to a value such as
* With max-size-bytes and max-size-time you can configure the buffering limits.
* The downloadbuffer element will try to read-ahead these amounts of data. When
* the amount of read-ahead data drops below low-percent of the configured max,
- * the element will start emiting BUFFERING messages until high-percent of max is
+ * the element will start emitting BUFFERING messages until high-percent of max is
* reached again.
*
* The downloadbuffer provides push and pull based scheduling on its source pad
* When the downloadbuffer has completely downloaded the media, it will
* post an application message named <classname>"GstCacheDownloadComplete"</classname>
* with the following information:
- * <itemizedlist>
- * <listitem>
- * <para>
+ *
+ * *
* G_TYPE_STRING
* <classname>"location"</classname>:
* the location of the completely downloaded file.
- * </para>
- * </listitem>
- * </itemizedlist>
+ *
*/
#ifdef HAVE_CONFIG_H
#include <io.h> /* lseek, open, close, read */
#undef lseek
#define lseek _lseeki64
-#undef off_t
-#define off_t guint64
#else
#include <unistd.h>
#endif
+#ifdef __BIONIC__
+#include <fcntl.h>
+#endif
+
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
G_DEFINE_TYPE_WITH_CODE (GstDownloadBuffer, gst_download_buffer,
GST_TYPE_ELEMENT, _do_init);
-static void update_buffering (GstDownloadBuffer * dlbuf);
+static GstMessage *update_buffering (GstDownloadBuffer * dlbuf);
static void gst_download_buffer_finalize (GObject * object);
g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
"Max. amount of data to buffer (bytes, 0=disable)",
0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+ G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
"Max. amount of data to buffer (in ns, 0=disable)", 0, G_MAXUINT64,
- DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+ G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
g_param_spec_int ("low-percent", "Low percent",
/* set several parent class virtual functions */
gobject_class->finalize = gst_download_buffer_finalize;
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&srctemplate));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sinktemplate));
+ gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+ gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
gst_element_class_set_static_metadata (gstelement_class, "DownloadBuffer",
"Generic", "Download Buffer element",
dlbuf->waiting_add = FALSE;
g_cond_init (&dlbuf->item_add);
- dlbuf->buffering_percent = 100;
-
/* tempfile related */
dlbuf->temp_template = NULL;
dlbuf->temp_location = NULL;
}
static void
+reset_positions (GstDownloadBuffer * dlbuf)
+{
+ dlbuf->write_pos = 0;
+ dlbuf->read_pos = 0;
+ dlbuf->filling = TRUE;
+ dlbuf->buffering_percent = 0;
+ dlbuf->is_buffering = TRUE;
+ dlbuf->seeking = FALSE;
+ GST_DOWNLOAD_BUFFER_CLEAR_LEVEL (dlbuf->cur_level);
+}
+
+static void
reset_rate_timer (GstDownloadBuffer * dlbuf)
{
dlbuf->bytes_in = 0;
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
static void
-update_time_level (GstDownloadBuffer * dlbuf)
+update_levels (GstDownloadBuffer * dlbuf, guint bytes)
{
+ dlbuf->cur_level.bytes = bytes;
+
if (dlbuf->byte_in_rate > 0.0) {
dlbuf->cur_level.time =
dlbuf->cur_level.bytes / dlbuf->byte_in_rate * GST_SECOND;
}
+
GST_DEBUG ("levels: bytes %u/%u, time %" GST_TIME_FORMAT "/%" GST_TIME_FORMAT,
dlbuf->cur_level.bytes, dlbuf->max_level.bytes,
GST_TIME_ARGS (dlbuf->cur_level.time),
GST_TIME_ARGS (dlbuf->max_level.time));
- /* update the buffering */
- update_buffering (dlbuf);
-}
-
-static void
-update_levels (GstDownloadBuffer * dlbuf, guint bytes)
-{
- dlbuf->cur_level.bytes = bytes;
- update_time_level (dlbuf);
}
static void
}
}
-static void
+static GstMessage *
update_buffering (GstDownloadBuffer * dlbuf)
{
gint percent;
gboolean post = FALSE;
+ GstMessage *message = NULL;
if (!get_buffering_percent (dlbuf, NULL, &percent))
- return;
+ return NULL;
if (dlbuf->is_buffering) {
post = TRUE;
}
if (post) {
- GstMessage *message;
GstBufferingMode mode;
gint avg_in, avg_out;
gint64 buffering_left;
(gint) percent);
gst_message_set_buffering_stats (message, mode,
avg_in, avg_out, buffering_left);
-
- gst_element_post_message (GST_ELEMENT_CAST (dlbuf), message);
}
+
+ return message;
}
static gboolean
return threshold;
}
+/* called with DOWNLOAD_BUFFER_MUTEX */
static void
gst_download_buffer_update_upstream_size (GstDownloadBuffer * dlbuf)
{
}
}
+/* called with DOWNLOAD_BUFFER_MUTEX */
static gboolean
check_upstream_size (GstDownloadBuffer * dlbuf, gsize offset, guint * length)
{
out_flushing:
{
GST_DEBUG_OBJECT (dlbuf, "we are flushing");
+ g_clear_error (&error);
gst_buffer_unmap (buf, &info);
if (*buffer == NULL)
gst_buffer_unref (buf);
GST_DEBUG_OBJECT (dlbuf, "opening temp file %s", dlbuf->temp_template);
- /* If temp_template was set, allocate a filename and open that filen */
+ /* If temp_template was set, allocate a filename and open that file */
/* nothing to do */
if (dlbuf->temp_template == NULL)
/* make copy of the template, we don't want to change this */
name = g_strdup (dlbuf->temp_template);
+#ifdef __BIONIC__
+ fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
+#else
fd = g_mkstemp (name);
+#endif
if (fd == -1)
goto mkstemp_failed;
g_free (dlbuf->temp_location);
dlbuf->temp_location = name;
dlbuf->temp_fd = fd;
+ reset_positions (dlbuf);
GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
{
if (clear_temp)
gst_download_buffer_flush_temp_file (dlbuf);
- GST_DOWNLOAD_BUFFER_CLEAR_LEVEL (dlbuf->cur_level);
+ reset_positions (dlbuf);
gst_event_replace (&dlbuf->stream_start_event, NULL);
gst_event_replace (&dlbuf->segment_event, NULL);
}
}
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
+ GstMessage *msg = NULL;
+
/* serialized events go in the buffer */
GST_DOWNLOAD_BUFFER_MUTEX_LOCK_CHECK (dlbuf, dlbuf->sinkresult,
out_flushing);
* filled and we can read all data from the dlbuf. */
/* update the buffering status */
update_levels (dlbuf, dlbuf->max_level.bytes);
+ /* update the buffering */
+ msg = update_buffering (dlbuf);
/* wakeup the waiter and let it recheck */
GST_DOWNLOAD_BUFFER_SIGNAL_ADD (dlbuf, -1);
break;
}
gst_event_unref (event);
GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
+ if (msg != NULL)
+ gst_element_post_message (GST_ELEMENT_CAST (dlbuf), msg);
} else {
/* non-serialized events are passed upstream. */
ret = gst_pad_push_event (dlbuf->srcpad, event);
guint64 offset;
gsize res, available;
GError *error = NULL;
+ GstMessage *msg = NULL;
dlbuf = GST_DOWNLOAD_BUFFER (parent);
update_levels (dlbuf, 0);
}
+ /* update the buffering */
+ msg = update_buffering (dlbuf);
+
GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
+ if (msg != NULL)
+ gst_element_post_message (GST_ELEMENT_CAST (dlbuf), msg);
+
return GST_FLOW_OK;
/* ERRORS */
}
write_error:
{
+ GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
gst_buffer_unmap (buffer, &info);
gst_buffer_unref (buffer);
GST_ELEMENT_ERROR (dlbuf, RESOURCE, WRITE,
dlbuf->write_pos = dlbuf->upstream_size;
dlbuf->filling = FALSE;
update_levels (dlbuf, dlbuf->max_level.bytes);
+ msg = update_buffering (dlbuf);
+
gst_element_post_message (GST_ELEMENT_CAST (dlbuf),
gst_message_new_element (GST_OBJECT_CAST (dlbuf),
gst_structure_new ("GstCacheDownloadComplete",
"location", G_TYPE_STRING, dlbuf->temp_location, NULL)));
+
GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
+ if (msg != NULL)
+ gst_element_post_message (GST_ELEMENT_CAST (dlbuf), msg);
+
return GST_FLOW_EOS;
}
}
GstDownloadBuffer *dlbuf;
GstFlowReturn ret;
GstBuffer *buffer = NULL;
+ GstMessage *msg = NULL;
dlbuf = GST_DOWNLOAD_BUFFER (GST_PAD_PARENT (pad));
if (ret != GST_FLOW_OK)
goto out_flushing;
+ /* update the buffering */
+ msg = update_buffering (dlbuf);
+
g_atomic_int_set (&dlbuf->downstream_may_block, 1);
GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
+ if (msg != NULL)
+ gst_element_post_message (GST_ELEMENT_CAST (dlbuf), msg);
+
ret = gst_pad_push (dlbuf->srcpad, buffer);
g_atomic_int_set (&dlbuf->downstream_may_block, 0);
* file. */
gst_pad_push_event (dlbuf->srcpad, gst_event_new_eos ());
} else if ((ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
- GST_ELEMENT_ERROR (dlbuf, STREAM, FAILED,
- (_("Internal data flow error.")),
- ("streaming task paused, reason %s (%d)",
- gst_flow_get_name (ret), ret));
+ GST_ELEMENT_FLOW_ERROR (dlbuf, ret);
gst_pad_push_event (dlbuf->srcpad, gst_event_new_eos ());
}
return;
switch (format) {
case GST_FORMAT_BYTES:
peer_pos -= dlbuf->cur_level.bytes;
+ if (peer_pos < 0) /* Clamp result to 0 */
+ peer_pos = 0;
break;
case GST_FORMAT_TIME:
peer_pos -= dlbuf->cur_level.time;
+ if (peer_pos < 0) /* Clamp result to 0 */
+ peer_pos = 0;
break;
default:
GST_WARNING_OBJECT (dlbuf, "dropping query in %s format, don't "
gint64 duration;
gsize offset, range_start, range_stop;
+ GST_DOWNLOAD_BUFFER_MUTEX_LOCK (dlbuf);
write_pos = dlbuf->write_pos;
/* get duration of upstream in bytes */
GST_DEBUG_OBJECT (dlbuf, "percent %d, duration %" G_GINT64_FORMAT
", writing %" G_GINT64_FORMAT, percent, duration, write_pos);
- /* calculate remaining and total download time */
- if (duration > write_pos && avg_in > 0.0)
- estimated_total = ((duration - write_pos) * 1000) / avg_in;
- else
- estimated_total = -1;
-
- GST_DEBUG_OBJECT (dlbuf, "estimated-total %" G_GINT64_FORMAT,
- estimated_total);
-
gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
- switch (format) {
- case GST_FORMAT_PERCENT:
- start = 0;
- /* get our available data relative to the duration */
- if (duration != -1)
- stop =
- gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, write_pos,
- duration);
- else
- stop = -1;
- break;
- case GST_FORMAT_BYTES:
- start = 0;
- stop = write_pos;
- break;
- default:
- start = -1;
- stop = -1;
- break;
- }
-
- gst_query_set_buffering_range (query, format, start, stop,
- estimated_total);
-
/* fill out the buffered ranges */
- offset = 0;
+ start = offset = 0;
+ stop = -1;
+ estimated_total = -1;
while (gst_sparse_file_get_range_after (dlbuf->file, offset,
&range_start, &range_stop)) {
+ gboolean current_range;
+
+ GST_DEBUG_OBJECT (dlbuf,
+ "range starting at %" G_GSIZE_FORMAT " and finishing at %"
+ G_GSIZE_FORMAT, range_start, range_stop);
+
offset = range_stop;
+ /* find the range we are currently downloading, we'll remember it
+ * after we convert to the target format */
+ if (range_start <= write_pos && range_stop >= write_pos) {
+ current_range = TRUE;
+ /* calculate remaining and total download time */
+ if (duration >= range_stop && avg_in > 0.0)
+ estimated_total = ((duration - range_stop) * 1000) / avg_in;
+ } else
+ current_range = FALSE;
+
switch (format) {
case GST_FORMAT_PERCENT:
+ /* get our available data relative to the duration */
if (duration == -1) {
range_start = 0;
range_stop = 0;
range_stop = -1;
break;
}
+
+ if (current_range) {
+ /* we are currently downloading this range */
+ start = range_start;
+ stop = range_stop;
+ }
+ GST_DEBUG_OBJECT (dlbuf,
+ "range to format: %" G_GSIZE_FORMAT " - %" G_GSIZE_FORMAT,
+ range_start, range_stop);
if (range_start == range_stop)
continue;
- GST_DEBUG_OBJECT (dlbuf,
- "range starting at %" G_GSIZE_FORMAT " and finishing at %"
- G_GSIZE_FORMAT, range_start, range_stop);
gst_query_add_buffering_range (query, range_start, range_stop);
}
+
+ GST_DEBUG_OBJECT (dlbuf, "estimated-total %" G_GINT64_FORMAT,
+ estimated_total);
+
+ gst_query_set_buffering_range (query, format, start, stop,
+ estimated_total);
+
+ GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
}
break;
}
{
GstDownloadBuffer *dlbuf;
GstFlowReturn ret;
+ GstMessage *msg = NULL;
dlbuf = GST_DOWNLOAD_BUFFER_CAST (parent);
GST_DOWNLOAD_BUFFER_MUTEX_LOCK_CHECK (dlbuf, dlbuf->srcresult, out_flushing);
/* FIXME - function will block when the range is not yet available */
ret = gst_download_buffer_read_buffer (dlbuf, offset, length, buffer);
+ /* update the buffering */
+ msg = update_buffering (dlbuf);
+
GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
+ if (msg != NULL)
+ gst_element_post_message (GST_ELEMENT_CAST (dlbuf), msg);
+
return ret;
/* ERRORS */
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstDownloadBuffer *dlbuf = GST_DOWNLOAD_BUFFER (object);
+ GstMessage *msg = NULL;
/* someone could change levels here, and since this
* affects the get/put funcs, we need to lock for safety. */
switch (prop_id) {
case PROP_MAX_SIZE_BYTES:
dlbuf->max_level.bytes = g_value_get_uint (value);
- CAPACITY_CHANGE (dlbuf);
+ msg = CAPACITY_CHANGE (dlbuf);
break;
case PROP_MAX_SIZE_TIME:
dlbuf->max_level.time = g_value_get_uint64 (value);
- CAPACITY_CHANGE (dlbuf);
+ msg = CAPACITY_CHANGE (dlbuf);
break;
case PROP_LOW_PERCENT:
dlbuf->low_percent = g_value_get_int (value);
}
GST_DOWNLOAD_BUFFER_MUTEX_UNLOCK (dlbuf);
+
+ if (msg != NULL)
+ gst_element_post_message (GST_ELEMENT_CAST (dlbuf), msg);
+
}
static void