/**
* SECTION:element-queue2
+ * @title: queue2
*
* Data is queued until one of the limits specified by the
* #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
#include <unistd.h>
#endif
+#ifdef __BIONIC__ /* Android */
+#undef lseek
+#define lseek lseek64
+#undef off_t
+#define off_t guint64
+#include <fcntl.h>
+#endif
+
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
#define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */
#define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */
#define DEFAULT_USE_BUFFERING FALSE
+#define DEFAULT_USE_TAGS_BITRATE FALSE
#define DEFAULT_USE_RATE_ESTIMATE TRUE
#define DEFAULT_LOW_PERCENT 10
#define DEFAULT_HIGH_PERCENT 99
+#define DEFAULT_LOW_WATERMARK 0.01
+#define DEFAULT_HIGH_WATERMARK 0.99
#define DEFAULT_TEMP_REMOVE TRUE
#define DEFAULT_RING_BUFFER_MAX_SIZE 0
PROP_MAX_SIZE_BYTES,
PROP_MAX_SIZE_TIME,
PROP_USE_BUFFERING,
+ PROP_USE_TAGS_BITRATE,
PROP_USE_RATE_ESTIMATE,
PROP_LOW_PERCENT,
PROP_HIGH_PERCENT,
+ PROP_LOW_WATERMARK,
+ PROP_HIGH_WATERMARK,
PROP_TEMP_TEMPLATE,
PROP_TEMP_LOCATION,
PROP_TEMP_REMOVE,
PROP_RING_BUFFER_MAX_SIZE,
+ PROP_AVG_IN_RATE,
PROP_LAST
};
+/* Explanation for buffer levels and percentages:
+ *
+ * The buffering_level functions here return a value in a normalized range
+ * that specifies the queue's current fill level. The range goes from 0 to
+ * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
+ *
+ * This is not to be confused with the buffering_percent value, which is
+ * a *relative* quantity - relative to the low/high watermarks.
+ * buffering_percent = 0% means buffering_level is at the low watermark.
+ * buffering_percent = 100% means buffering_level is at the high watermark.
+ * buffering_percent is used for determining if the fill level has reached
+ * the high watermark, and for producing BUFFERING messages. This value
+ * always uses a 0..100 range (since it is a percentage).
+ *
+ * To avoid future confusions, whenever "buffering level" is mentioned, it
+ * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
+ * range. Whenever "buffering_percent" is mentioned, it refers to the
+ * percentage value that is relative to the low/high watermark. */
+
+/* Using a buffering level range of 0..1000000 to allow for a
+ * resolution in ppm (1 ppm = 0.0001%) */
+#define MAX_BUFFERING_LEVEL 1000000
+
+/* How much 1% makes up in the buffer level range */
+#define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
+
#define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \
l.buffers = 0; \
l.bytes = 0; \
static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
static void gst_queue2_loop (GstPad * pad);
-static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
- GstEvent * event);
+static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad,
+ GstObject * parent, GstEvent * event);
static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static gboolean gst_queue2_is_filled (GstQueue2 * queue);
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
-static void update_in_rates (GstQueue2 * queue);
+static void update_in_rates (GstQueue2 * queue, gboolean force);
+static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
static void gst_queue2_post_buffering (GstQueue2 * queue);
typedef enum
"Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE,
+ g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags",
+ "Use a bitrate from upstream tags to estimate buffer duration if not provided",
+ DEFAULT_USE_TAGS_BITRATE,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+ G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
"Estimate the bitrate of the stream to calculate time level",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
g_param_spec_int ("low-percent", "Low percent",
- "Low threshold for buffering to start. Only used if use-buffering is True",
- 0, 100, DEFAULT_LOW_PERCENT,
+ "Low threshold for buffering to start. Only used if use-buffering is True "
+ "(Deprecated: use low-watermark instead)",
+ 0, 100, DEFAULT_LOW_WATERMARK * 100,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
g_param_spec_int ("high-percent", "High percent",
+ "High threshold for buffering to finish. Only used if use-buffering is True "
+ "(Deprecated: use high-watermark instead)",
+ 0, 100, DEFAULT_HIGH_WATERMARK * 100,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
+ g_param_spec_double ("low-watermark", "Low watermark",
+ "Low threshold for buffering to start. Only used if use-buffering is True",
+ 0.0, 1.0, DEFAULT_LOW_WATERMARK,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
+ g_param_spec_double ("high-watermark", "High watermark",
"High threshold for buffering to finish. Only used if use-buffering is True",
- 0, 100, DEFAULT_HIGH_PERCENT,
+ 0.0, 1.0, DEFAULT_HIGH_WATERMARK,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstQueue2:avg-in-rate
+ *
+ * The average input data rate.
+ */
+ g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE,
+ g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)",
+ "Average input data rate (bytes/s)",
+ 0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
/* set several parent class virtual functions */
gobject_class->finalize = gst_queue2_finalize;
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&srctemplate));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sinktemplate));
+ gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+ gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
"Generic",
GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
gst_pad_set_activatemode_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
- gst_pad_set_event_function (queue->sinkpad,
+ gst_pad_set_event_full_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
gst_pad_set_query_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
queue->use_buffering = DEFAULT_USE_BUFFERING;
queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
- queue->low_percent = DEFAULT_LOW_PERCENT;
- queue->high_percent = DEFAULT_HIGH_PERCENT;
+ queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
+ queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
/* now configure the values, we use these to track timestamps on the
* sinkpad. */
if (segment->format != GST_FORMAT_TIME) {
- /* non-time format, pretent the current time segment is closed with a
+ /* non-time format, pretend the current time segment is closed with a
* 0 start and unknown stop time. */
segment->format = GST_FORMAT_TIME;
segment->start = 0;
/* take a buffer and update segment, updating the time level of the queue. */
static void
apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
- gboolean is_sink)
+ guint64 size, gboolean is_sink)
{
GstClockTime duration, timestamp;
- timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
duration = GST_BUFFER_DURATION (buffer);
+ /* If we have no duration, pick one from the bitrate if we can */
+ if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
+ guint bitrate =
+ is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
+ if (bitrate)
+ duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
+ }
+
/* if no timestamp is set, assume it's continuous with the previous
* time */
if (timestamp == GST_CLOCK_TIME_NONE)
update_time_level (queue);
}
+struct BufListData
+{
+ GstClockTime timestamp;
+ guint bitrate;
+};
+
static gboolean
buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
{
- GstClockTime *timestamp = data;
+ struct BufListData *bld = data;
+ GstClockTime *timestamp = &bld->timestamp;
+ GstClockTime btime;
- GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT
+ GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
" duration %" GST_TIME_FORMAT, idx,
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
+ GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
+ GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
- if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
- *timestamp = GST_BUFFER_TIMESTAMP (*buf);
+ btime = GST_BUFFER_DTS_OR_PTS (*buf);
+ if (GST_CLOCK_TIME_IS_VALID (btime))
+ *timestamp = btime;
if (GST_BUFFER_DURATION_IS_VALID (*buf))
*timestamp += GST_BUFFER_DURATION (*buf);
+ else if (bld->bitrate != 0) {
+ guint64 size = gst_buffer_get_size (*buf);
+
+ /* If we have no duration, pick one from the bitrate if we can */
+ *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
+ }
+
GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
return TRUE;
apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
GstSegment * segment, gboolean is_sink)
{
- GstClockTime timestamp;
+ struct BufListData bld;
/* if no timestamp is set, assume it's continuous with the previous time */
- timestamp = segment->position;
+ bld.timestamp = segment->position;
+
+ if (queue->use_tags_bitrate) {
+ if (is_sink)
+ bld.bitrate = queue->sink_tags_bitrate;
+ else
+ bld.bitrate = queue->src_tags_bitrate;
+ } else
+ bld.bitrate = 0;
- gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, ×tamp);
+ gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timestamp));
+ GST_TIME_ARGS (bld.timestamp));
- segment->position = timestamp;
+ segment->position = bld.timestamp;
if (is_sink)
queue->sink_tainted = TRUE;
update_time_level (queue);
}
+static inline gint
+normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
+ guint64 alt_max)
+{
+ guint64 p;
+
+ if (max_level == 0)
+ return 0;
+
+ if (alt_max > 0)
+ p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
+ MIN (max_level, alt_max));
+ else
+ p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
+
+ return MIN (p, MAX_BUFFERING_LEVEL);
+}
+
static gboolean
-get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
- gint * percent)
+get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
+ gint * buffering_level)
{
- gint perc;
+ gint buflevel, buflevel2;
- if (queue->high_percent <= 0) {
- if (percent)
- *percent = 100;
+ if (queue->high_watermark <= 0) {
+ if (buffering_level)
+ *buffering_level = MAX_BUFFERING_LEVEL;
if (is_buffering)
*is_buffering = FALSE;
return FALSE;
}
-#define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
+#define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
+ normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
if (queue->is_eos) {
/* on EOS we are always 100% full, we set the var here so that it we can
* reuse the logic below to stop buffering */
- perc = 100;
+ buflevel = MAX_BUFFERING_LEVEL;
GST_LOG_OBJECT (queue, "we are EOS");
} else {
- /* figure out the percent we are filled, we take the max of all formats. */
+ GST_LOG_OBJECT (queue,
+ "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
+ queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
+ queue->cur_level.buffers);
+
+ /* figure out the buffering level we are filled, we take the max of all formats. */
if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
- perc = GET_PERCENT (bytes, 0);
+ buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
} else {
guint64 rb_size = queue->ring_buffer_max_size;
- perc = GET_PERCENT (bytes, rb_size);
+ buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
}
- perc = MAX (perc, GET_PERCENT (time, 0));
- perc = MAX (perc, GET_PERCENT (buffers, 0));
+
+ buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
+ buflevel = MAX (buflevel, buflevel2);
+
+ buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
+ buflevel = MAX (buflevel, buflevel2);
/* also apply the rate estimate when we need to */
- if (queue->use_rate_estimate)
- perc = MAX (perc, GET_PERCENT (rate_time, 0));
+ if (queue->use_rate_estimate) {
+ buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
+ buflevel = MAX (buflevel, buflevel2);
+ }
+
+ /* Don't get to 0% unless we're really empty */
+ if (queue->cur_level.bytes > 0)
+ buflevel = MAX (1, buflevel);
}
-#undef GET_PERCENT
+#undef GET_BUFFER_LEVEL_FOR_QUANTITY
if (is_buffering)
*is_buffering = queue->is_buffering;
- /* scale to high percent so that it becomes the 100% mark */
- perc = perc * 100 / queue->high_percent;
- /* clip */
- if (perc > 100)
- perc = 100;
-
- if (percent)
- *percent = perc;
+ if (buffering_level)
+ *buffering_level = buflevel;
- GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
- perc);
+ GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
+ buflevel);
return TRUE;
}
+static gint
+convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
+{
+ int percent;
+
+ /* scale so that if buffering_level equals the high watermark,
+ * the percentage is 100% */
+ percent = buffering_level * 100 / queue->high_watermark;
+ /* clip */
+ if (percent > 100)
+ percent = 100;
+
+ return percent;
+}
+
static void
get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
gint * avg_in, gint * avg_out, gint64 * buffering_left)
}
}
-static void
-gst_queue2_post_buffering (GstQueue2 * queue)
+/* Called with the lock taken */
+static GstMessage *
+gst_queue2_get_buffering_message (GstQueue2 * queue)
{
GstMessage *msg = NULL;
- g_mutex_lock (&queue->buffering_post_lock);
- GST_QUEUE2_MUTEX_LOCK (queue);
if (queue->percent_changed) {
gint percent = queue->buffering_percent;
gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
queue->avg_out, queue->buffering_left);
}
+
+ return msg;
+}
+
+static void
+gst_queue2_post_buffering (GstQueue2 * queue)
+{
+ GstMessage *msg = NULL;
+
+ g_mutex_lock (&queue->buffering_post_lock);
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ msg = gst_queue2_get_buffering_message (queue);
GST_QUEUE2_MUTEX_UNLOCK (queue);
if (msg != NULL)
static void
update_buffering (GstQueue2 * queue)
{
- gint percent;
+ gint buffering_level, percent;
/* Ensure the variables used to calculate buffering state are up-to-date. */
if (queue->current)
update_cur_level (queue, queue->current);
- update_in_rates (queue);
+ update_in_rates (queue, FALSE);
- if (!get_buffering_percent (queue, NULL, &percent))
+ if (!get_buffering_level (queue, NULL, &buffering_level))
return;
+ percent = convert_to_buffering_percent (queue, buffering_level);
+
if (queue->is_buffering) {
/* if we were buffering see if we reached the high watermark */
if (percent >= 100)
} else {
/* we were not buffering, check if we need to start buffering if we drop
* below the low threshold */
- if (percent < queue->low_percent) {
+ if (buffering_level < queue->low_watermark) {
queue->is_buffering = TRUE;
SET_PERCENT (queue, percent);
}
queue->byte_in_rate = 0.0;
queue->byte_in_period = 0;
queue->byte_out_rate = 0.0;
+ queue->last_update_in_rates_elapsed = 0.0;
queue->last_in_elapsed = 0.0;
queue->last_out_elapsed = 0.0;
queue->in_timer_started = FALSE;
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
static void
-update_in_rates (GstQueue2 * queue)
+update_in_rates (GstQueue2 * queue, gboolean force)
{
gdouble elapsed, period;
gdouble byte_in_rate;
return;
}
- elapsed = g_timer_elapsed (queue->in_timer, NULL);
+ queue->last_update_in_rates_elapsed = elapsed =
+ g_timer_elapsed (queue->in_timer, NULL);
/* recalc after each interval. */
- if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
+ if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
period = elapsed - queue->last_in_elapsed;
GST_DEBUG_OBJECT (queue,
else
buf = *buffer;
- gst_buffer_map (buf, &info, GST_MAP_WRITE);
+ if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
+ goto buffer_write_fail;
data = info.data;
GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
gst_buffer_unref (buf);
return ret;
}
+buffer_write_fail:
+ {
+ GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
+ ("Can't write to buffer"));
+ if (*buffer == NULL)
+ gst_buffer_unref (buf);
+ return GST_FLOW_ERROR;
+ }
}
/* should be called with QUEUE_LOCK */
GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
- /* If temp_template was set, allocate a filename and open that filen */
+ /* If temp_template was set, allocate a filename and open that file */
/* nothing to do */
if (queue->temp_template == NULL)
/* make copy of the template, we don't want to change this */
name = g_strdup (queue->temp_template);
+
+#ifdef __BIONIC__
+ fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
+#else
fd = g_mkstemp (name);
+#endif
+
if (fd == -1)
goto mkstemp_failed;
ring_buffer = queue->ring_buffer;
rb_size = queue->ring_buffer_max_size;
- gst_buffer_map (buffer, &info, GST_MAP_READ);
+ if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
+ goto buffer_read_error;
size = info.size;
data = info.data;
guint64 range_data_start, range_data_end;
GstQueue2Range *range_to_destroy = NULL;
+ if (range == queue->current)
+ goto next_range;
+
range_data_start = range->rb_offset;
range_data_end = range->rb_writing_pos;
update_cur_level (queue, queue->current);
/* update the buffering status */
- if (queue->use_buffering)
+ if (queue->use_buffering) {
+ GstMessage *msg;
update_buffering (queue);
+ msg = gst_queue2_get_buffering_message (queue);
+ if (msg) {
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ g_mutex_lock (&queue->buffering_post_lock);
+ gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
+ g_mutex_unlock (&queue->buffering_post_lock);
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ }
+ }
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
gst_buffer_unmap (buffer, &info);
return FALSE;
}
+buffer_read_error:
+ {
+ GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
+ ("Can't read from buffer"));
+ return FALSE;
+ }
}
static gboolean
queue->bytes_in += size;
/* apply new buffer to segment stats */
- apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
+ apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
/* update the byterate stats */
- update_in_rates (queue);
+ update_in_rates (queue, FALSE);
if (!QUEUE_IS_USING_QUEUE (queue)) {
/* FIXME - check return value? */
apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
/* update the byterate stats */
- update_in_rates (queue);
+ update_in_rates (queue, FALSE);
if (!QUEUE_IS_USING_QUEUE (queue)) {
gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
* filled and we can read all data from the queue. */
GST_DEBUG_OBJECT (queue, "we have EOS");
queue->is_eos = TRUE;
+ /* Force updating the input bitrate */
+ update_in_rates (queue, TRUE);
break;
case GST_EVENT_SEGMENT:
apply_segment (queue, event, &queue->sink_segment, TRUE);
/* ERRORS */
unexpected_event:
{
- g_warning
- ("Unexpected event of kind %s can't be added in temp file of queue %s ",
- gst_event_type_get_name (GST_EVENT_TYPE (item)),
- GST_OBJECT_NAME (queue));
+ gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
+
+ GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
+ "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
+ GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
gst_event_unref (GST_EVENT_CAST (item));
return;
}
}
queue->bytes_out += size;
- apply_buffer (queue, buffer, &queue->src_segment, FALSE);
+ apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
/* update the byterate stats */
update_out_rates (queue);
/* update the buffering */
}
}
-static gboolean
+static GstFlowReturn
gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event)
{
/* unblock the loop and chain functions */
GST_QUEUE2_SIGNAL_ADD (queue);
GST_QUEUE2_SIGNAL_DEL (queue);
- queue->last_query = FALSE;
- g_cond_signal (&queue->query_handled);
GST_QUEUE2_MUTEX_UNLOCK (queue);
/* make sure it pauses, this should happen since we sent
* flush_start downstream. */
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
+
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ queue->last_query = FALSE;
+ g_cond_signal (&queue->query_handled);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE2_MUTEX_LOCK (queue);
/* flush the sink pad */
queue->is_eos = FALSE;
queue->unexpected = FALSE;
queue->seeking = FALSE;
+ queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
/* reset rate counters */
reset_rate_timer (queue);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
queue->unexpected = FALSE;
queue->sinkresult = GST_FLOW_OK;
queue->seeking = FALSE;
+ queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
}
break;
}
+ case GST_EVENT_TAG:{
+ if (queue->use_tags_bitrate) {
+ GstTagList *tags;
+ guint bitrate;
+
+ gst_event_parse_tag (event, &tags);
+ if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
+ gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ queue->sink_tags_bitrate = bitrate;
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
+ }
+ }
+ /* Fall-through */
+ }
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
/* serialized events go in the queue */
} else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
if (queue->srcresult == GST_FLOW_NOT_LINKED
|| queue->srcresult < GST_FLOW_EOS) {
- GST_ELEMENT_ERROR (queue, STREAM, FAILED,
- (_("Internal data flow error.")),
- ("streaming task paused, reason %s (%d)",
- gst_flow_get_name (queue->srcresult), queue->srcresult));
+ GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
}
goto out_flow_error;
}
}
break;
}
- return ret;
+ if (ret == FALSE)
+ return GST_FLOW_ERROR;
+ return GST_FLOW_OK;
/* ERRORS */
out_flushing:
GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
- return FALSE;
+ return GST_FLOW_FLUSHING;
}
out_eos:
{
GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
- return FALSE;
+ return GST_FLOW_EOS;
}
out_flow_error:
{
gst_flow_get_name (queue->srcresult));
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
- return FALSE;
+ return queue->srcresult;
}
}
GST_QUEUE2_ITEM_TYPE_QUERY);
STATUS (queue, queue->sinkpad, "wait for QUERY");
- g_cond_wait (&queue->query_handled, &queue->qlock);
+ while (queue->sinkresult == GST_FLOW_OK &&
+ queue->last_handled_query != query)
+ g_cond_wait (&queue->query_handled, &queue->qlock);
+ queue->last_handled_query = NULL;
if (queue->sinkresult != GST_FLOW_OK)
goto out_flushing;
res = queue->last_query;
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
+ if (type == GST_EVENT_TAG) {
+ if (queue->use_tags_bitrate) {
+ GstTagList *tags;
+ guint bitrate;
+
+ gst_event_parse_tag (event, &tags);
+ if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
+ gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ queue->src_tags_bitrate = bitrate;
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
+ }
+ }
+ }
+
gst_pad_push_event (queue->srcpad, event);
/* if we're EOS, return EOS so that the task pauses. */
GstQuery *query = GST_QUERY_CAST (data);
GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
+ queue->last_handled_query = query;
queue->last_query = gst_pad_peer_query (queue->srcpad, query);
GST_LOG_OBJECT (queue->srcpad, "Peered query");
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
/* let app know about us giving up if upstream is not expected to do so */
/* EOS is already taken care of elsewhere */
if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
- GST_ELEMENT_ERROR (queue, STREAM, FAILED,
- (_("Internal data flow error.")),
- ("streaming task paused, reason %s (%d)",
- gst_flow_get_name (ret), ret));
+ GST_ELEMENT_FLOW_ERROR (queue, ret);
gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
}
return;
switch (format) {
case GST_FORMAT_BYTES:
peer_pos -= queue->cur_level.bytes;
+ if (peer_pos < 0) /* Clamp result to 0 */
+ peer_pos = 0;
break;
case GST_FORMAT_TIME:
peer_pos -= queue->cur_level.time;
+ if (peer_pos < 0) /* Clamp result to 0 */
+ peer_pos = 0;
break;
default:
GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
GST_DEBUG_OBJECT (queue, "query buffering");
- get_buffering_percent (queue, &is_buffering, &percent);
+ get_buffering_level (queue, &is_buffering, &percent);
+ percent = convert_to_buffering_percent (queue, percent);
gst_query_set_buffering_percent (query, is_buffering, percent);
get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
break;
}
- case GST_QUERY_SEEKING:
- {
- gint64 segment_start, segment_end;
- GstFormat format;
- gboolean seekable, peer_success;
-
- peer_success = gst_pad_peer_query (queue->sinkpad, query);
-
- gst_query_parse_seeking (query, &format, &seekable, &segment_start,
- &segment_end);
-
- if (peer_success && seekable) {
- GST_DEBUG_OBJECT (queue, "peer seekable (%s) from %" G_GINT64_FORMAT
- " to %" G_GINT64_FORMAT, gst_format_get_name (format),
- segment_start, segment_end);
- break;
- }
-
- if (format != GST_FORMAT_BYTES) {
- GST_DEBUG_OBJECT (queue, "query in %s (not BYTES) format, cannot seek",
- gst_format_get_name (format));
- return FALSE;
- }
-
- GST_QUEUE2_MUTEX_LOCK (queue);
- if (queue->current) {
- if (QUEUE_IS_USING_RING_BUFFER (queue)) {
- segment_start = queue->current->rb_offset;
- segment_end = queue->current->rb_writing_pos;
- } else if (QUEUE_IS_USING_TEMP_FILE (queue)) {
- segment_start = queue->current->offset;
- segment_end = queue->current->writing_pos;
- } else {
- segment_start = -1;
- segment_end = -1;
- }
- }
- GST_QUEUE2_MUTEX_UNLOCK (queue);
-
- seekable = ! !(segment_start < segment_end);
-
- GST_DEBUG_OBJECT (queue, "segment from %" G_GINT64_FORMAT " to %"
- G_GINT64_FORMAT " %sseekable", segment_start, segment_end,
- seekable ? "" : "not ");
-
- gst_query_set_seeking (query, format, seekable, segment_start,
- segment_end);
- break;
- }
default:
/* peer handled other queries */
if (!gst_pad_query_default (pad, parent, query))
queue->srcresult = GST_FLOW_FLUSHING;
queue->sinkresult = GST_FLOW_FLUSHING;
GST_QUEUE2_SIGNAL_DEL (queue);
- /* Unblock query handler */
- queue->last_query = FALSE;
- g_cond_signal (&queue->query_handled);
GST_QUEUE2_MUTEX_UNLOCK (queue);
/* wait until it is unblocked and clean up */
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
+
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ gst_queue2_locked_flush (queue, FALSE, FALSE);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
}
return result;
update_buffering (queue);
}
break;
+ case PROP_USE_TAGS_BITRATE:
+ queue->use_tags_bitrate = g_value_get_boolean (value);
+ break;
case PROP_USE_RATE_ESTIMATE:
queue->use_rate_estimate = g_value_get_boolean (value);
break;
case PROP_LOW_PERCENT:
- queue->low_percent = g_value_get_int (value);
+ queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
+ if (queue->is_buffering)
+ update_buffering (queue);
break;
case PROP_HIGH_PERCENT:
- queue->high_percent = g_value_get_int (value);
+ queue->high_watermark =
+ g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
+ if (queue->is_buffering)
+ update_buffering (queue);
+ break;
+ case PROP_LOW_WATERMARK:
+ queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
+ if (queue->is_buffering)
+ update_buffering (queue);
+ break;
+ case PROP_HIGH_WATERMARK:
+ queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
+ if (queue->is_buffering)
+ update_buffering (queue);
break;
case PROP_TEMP_TEMPLATE:
gst_queue2_set_temp_template (queue, g_value_get_string (value));
case PROP_USE_BUFFERING:
g_value_set_boolean (value, queue->use_buffering);
break;
+ case PROP_USE_TAGS_BITRATE:
+ g_value_set_boolean (value, queue->use_tags_bitrate);
+ break;
case PROP_USE_RATE_ESTIMATE:
g_value_set_boolean (value, queue->use_rate_estimate);
break;
case PROP_LOW_PERCENT:
- g_value_set_int (value, queue->low_percent);
+ g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
break;
case PROP_HIGH_PERCENT:
- g_value_set_int (value, queue->high_percent);
+ g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
+ break;
+ case PROP_LOW_WATERMARK:
+ g_value_set_double (value, queue->low_watermark /
+ (gdouble) MAX_BUFFERING_LEVEL);
+ break;
+ case PROP_HIGH_WATERMARK:
+ g_value_set_double (value, queue->high_watermark /
+ (gdouble) MAX_BUFFERING_LEVEL);
break;
case PROP_TEMP_TEMPLATE:
g_value_set_string (value, queue->temp_template);
case PROP_RING_BUFFER_MAX_SIZE:
g_value_set_uint64 (value, queue->ring_buffer_max_size);
break;
+ case PROP_AVG_IN_RATE:
+ {
+ gdouble in_rate = queue->byte_in_rate;
+
+ /* During the first RATE_INTERVAL, byte_in_rate will not have been
+ * calculated, so calculate it here. */
+ if (in_rate == 0.0 && queue->bytes_in
+ && queue->last_update_in_rates_elapsed > 0.0)
+ in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
+
+ g_value_set_int64 (value, (gint64) in_rate);
+ break;
+ }
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;