/**
* SECTION:element-queue2
+ * @title: queue2
*
* Data is queued until one of the limits specified by the
* #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
#define DEFAULT_USE_RATE_ESTIMATE TRUE
#define DEFAULT_LOW_PERCENT 10
#define DEFAULT_HIGH_PERCENT 99
+#define DEFAULT_LOW_WATERMARK 0.01
+#define DEFAULT_HIGH_WATERMARK 0.99
#define DEFAULT_TEMP_REMOVE TRUE
#define DEFAULT_RING_BUFFER_MAX_SIZE 0
PROP_USE_RATE_ESTIMATE,
PROP_LOW_PERCENT,
PROP_HIGH_PERCENT,
+ PROP_LOW_WATERMARK,
+ PROP_HIGH_WATERMARK,
PROP_TEMP_TEMPLATE,
PROP_TEMP_LOCATION,
PROP_TEMP_REMOVE,
PROP_LAST
};
+/* Explanation for buffer levels and percentages:
+ *
+ * The buffering_level functions here return a value in a normalized range
+ * that specifies the queue's current fill level. The range goes from 0 to
+ * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
+ *
+ * This is not to be confused with the buffering_percent value, which is
+ * a *relative* quantity - relative to the low/high watermarks.
+ * buffering_percent = 0% means buffering_level is at the low watermark.
+ * buffering_percent = 100% means buffering_level is at the high watermark.
+ * buffering_percent is used for determining if the fill level has reached
+ * the high watermark, and for producing BUFFERING messages. This value
+ * always uses a 0..100 range (since it is a percentage).
+ *
+ * To avoid future confusions, whenever "buffering level" is mentioned, it
+ * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
+ * range. Whenever "buffering_percent" is mentioned, it refers to the
+ * percentage value that is relative to the low/high watermark. */
+
+/* Using a buffering level range of 0..1000000 to allow for a
+ * resolution in ppm (1 ppm = 0.0001%) */
+#define MAX_BUFFERING_LEVEL 1000000
+
+/* How much 1% makes up in the buffer level range */
+#define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
+
#define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \
l.buffers = 0; \
l.bytes = 0; \
static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
static void gst_queue2_loop (GstPad * pad);
-static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
- GstEvent * event);
+static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad,
+ GstObject * parent, GstEvent * event);
static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
static void update_in_rates (GstQueue2 * queue, gboolean force);
+static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
static void gst_queue2_post_buffering (GstQueue2 * queue);
typedef enum
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
g_param_spec_int ("low-percent", "Low percent",
- "Low threshold for buffering to start. Only used if use-buffering is True",
- 0, 100, DEFAULT_LOW_PERCENT,
+ "Low threshold for buffering to start. Only used if use-buffering is True "
+ "(Deprecated: use low-watermark instead)",
+ 0, 100, DEFAULT_LOW_WATERMARK * 100,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
g_param_spec_int ("high-percent", "High percent",
+ "High threshold for buffering to finish. Only used if use-buffering is True "
+ "(Deprecated: use high-watermark instead)",
+ 0, 100, DEFAULT_HIGH_WATERMARK * 100,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
+ g_param_spec_double ("low-watermark", "Low watermark",
+ "Low threshold for buffering to start. Only used if use-buffering is True",
+ 0.0, 1.0, DEFAULT_LOW_WATERMARK,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
+ g_param_spec_double ("high-watermark", "High watermark",
"High threshold for buffering to finish. Only used if use-buffering is True",
- 0, 100, DEFAULT_HIGH_PERCENT,
+ 0.0, 1.0, DEFAULT_HIGH_WATERMARK,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
gst_pad_set_activatemode_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
- gst_pad_set_event_function (queue->sinkpad,
+ gst_pad_set_event_full_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
gst_pad_set_query_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
queue->use_buffering = DEFAULT_USE_BUFFERING;
queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
- queue->low_percent = DEFAULT_LOW_PERCENT;
- queue->high_percent = DEFAULT_HIGH_PERCENT;
+ queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
+ queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
}
static inline gint
-get_percent (guint64 cur_level, guint64 max_level, guint64 alt_max)
+normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
+ guint64 alt_max)
{
guint64 p;
return 0;
if (alt_max > 0)
- p = gst_util_uint64_scale (cur_level, 100, MIN (max_level, alt_max));
+ p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
+ MIN (max_level, alt_max));
else
- p = gst_util_uint64_scale (cur_level, 100, max_level);
+ p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
- return MIN (p, 100);
+ return MIN (p, MAX_BUFFERING_LEVEL);
}
static gboolean
-get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering,
- gint * percent)
+get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
+ gint * buffering_level)
{
- gint perc, perc2;
+ gint buflevel, buflevel2;
- if (queue->high_percent <= 0) {
- if (percent)
- *percent = 100;
+ if (queue->high_watermark <= 0) {
+ if (buffering_level)
+ *buffering_level = MAX_BUFFERING_LEVEL;
if (is_buffering)
*is_buffering = FALSE;
return FALSE;
}
-#define GET_PERCENT(format,alt_max) \
- get_percent(queue->cur_level.format,queue->max_level.format,(alt_max))
+#define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
+ normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
if (queue->is_eos) {
/* on EOS we are always 100% full, we set the var here so that it we can
* reuse the logic below to stop buffering */
- perc = 100;
+ buflevel = MAX_BUFFERING_LEVEL;
GST_LOG_OBJECT (queue, "we are EOS");
} else {
GST_LOG_OBJECT (queue,
queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
queue->cur_level.buffers);
- /* figure out the percent we are filled, we take the max of all formats. */
+ /* figure out the buffering level we are filled, we take the max of all formats. */
if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
- perc = GET_PERCENT (bytes, 0);
+ buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
} else {
guint64 rb_size = queue->ring_buffer_max_size;
- perc = GET_PERCENT (bytes, rb_size);
+ buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
}
- perc2 = GET_PERCENT (time, 0);
- perc = MAX (perc, perc2);
+ buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
+ buflevel = MAX (buflevel, buflevel2);
- perc2 = GET_PERCENT (buffers, 0);
- perc = MAX (perc, perc2);
+ buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
+ buflevel = MAX (buflevel, buflevel2);
/* also apply the rate estimate when we need to */
if (queue->use_rate_estimate) {
- perc2 = GET_PERCENT (rate_time, 0);
- perc = MAX (perc, perc2);
+ buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
+ buflevel = MAX (buflevel, buflevel2);
}
/* Don't get to 0% unless we're really empty */
if (queue->cur_level.bytes > 0)
- perc = MAX (1, perc);
+ buflevel = MAX (1, buflevel);
}
-#undef GET_PERCENT
+#undef GET_BUFFER_LEVEL_FOR_QUANTITY
if (is_buffering)
*is_buffering = queue->is_buffering;
- /* scale to high percent so that it becomes the 100% mark */
- perc = perc * 100 / queue->high_percent;
- /* clip */
- if (perc > 100)
- perc = 100;
+ if (buffering_level)
+ *buffering_level = buflevel;
- if (percent)
- *percent = perc;
-
- GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering,
- perc);
+ GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
+ buflevel);
return TRUE;
}
+static gint
+convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
+{
+ int percent;
+
+ /* scale so that if buffering_level equals the high watermark,
+ * the percentage is 100% */
+ percent = buffering_level * 100 / queue->high_watermark;
+ /* clip */
+ if (percent > 100)
+ percent = 100;
+
+ return percent;
+}
+
static void
get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
gint * avg_in, gint * avg_out, gint64 * buffering_left)
}
}
-static void
-gst_queue2_post_buffering (GstQueue2 * queue)
+/* Called with the lock taken */
+static GstMessage *
+gst_queue2_get_buffering_message (GstQueue2 * queue)
{
GstMessage *msg = NULL;
- g_mutex_lock (&queue->buffering_post_lock);
- GST_QUEUE2_MUTEX_LOCK (queue);
if (queue->percent_changed) {
gint percent = queue->buffering_percent;
gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
queue->avg_out, queue->buffering_left);
}
+
+ return msg;
+}
+
+static void
+gst_queue2_post_buffering (GstQueue2 * queue)
+{
+ GstMessage *msg = NULL;
+
+ g_mutex_lock (&queue->buffering_post_lock);
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ msg = gst_queue2_get_buffering_message (queue);
GST_QUEUE2_MUTEX_UNLOCK (queue);
if (msg != NULL)
static void
update_buffering (GstQueue2 * queue)
{
- gint percent;
+ gint buffering_level, percent;
/* Ensure the variables used to calculate buffering state are up-to-date. */
if (queue->current)
update_cur_level (queue, queue->current);
update_in_rates (queue, FALSE);
- if (!get_buffering_percent (queue, NULL, &percent))
+ if (!get_buffering_level (queue, NULL, &buffering_level))
return;
+ percent = convert_to_buffering_percent (queue, buffering_level);
+
if (queue->is_buffering) {
/* if we were buffering see if we reached the high watermark */
if (percent >= 100)
} else {
/* we were not buffering, check if we need to start buffering if we drop
* below the low threshold */
- if (percent < queue->low_percent) {
+ if (buffering_level < queue->low_watermark) {
queue->is_buffering = TRUE;
SET_PERCENT (queue, percent);
}
update_cur_level (queue, queue->current);
/* update the buffering status */
- if (queue->use_buffering)
+ if (queue->use_buffering) {
+ GstMessage *msg;
update_buffering (queue);
+ msg = gst_queue2_get_buffering_message (queue);
+ if (msg) {
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ g_mutex_lock (&queue->buffering_post_lock);
+ gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
+ g_mutex_unlock (&queue->buffering_post_lock);
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ }
+ }
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
}
}
-static gboolean
+static GstFlowReturn
gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event)
{
} else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
if (queue->srcresult == GST_FLOW_NOT_LINKED
|| queue->srcresult < GST_FLOW_EOS) {
- GST_ELEMENT_ERROR (queue, STREAM, FAILED,
- (_("Internal data flow error.")),
- ("streaming task paused, reason %s (%d)",
- gst_flow_get_name (queue->srcresult), queue->srcresult));
+ GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
}
goto out_flow_error;
}
}
break;
}
- return ret;
+ if (ret == FALSE)
+ return GST_FLOW_ERROR;
+ return GST_FLOW_OK;
/* ERRORS */
out_flushing:
GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
- return FALSE;
+ return GST_FLOW_FLUSHING;
}
out_eos:
{
GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
- return FALSE;
+ return GST_FLOW_EOS;
}
out_flow_error:
{
gst_flow_get_name (queue->srcresult));
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
- return FALSE;
+ return queue->srcresult;
}
}
GST_QUEUE2_ITEM_TYPE_QUERY);
STATUS (queue, queue->sinkpad, "wait for QUERY");
- g_cond_wait (&queue->query_handled, &queue->qlock);
+ while (queue->sinkresult == GST_FLOW_OK &&
+ queue->last_handled_query != query)
+ g_cond_wait (&queue->query_handled, &queue->qlock);
+ queue->last_handled_query = NULL;
if (queue->sinkresult != GST_FLOW_OK)
goto out_flushing;
res = queue->last_query;
GstQuery *query = GST_QUERY_CAST (data);
GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
+ queue->last_handled_query = query;
queue->last_query = gst_pad_peer_query (queue->srcpad, query);
GST_LOG_OBJECT (queue->srcpad, "Peered query");
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
/* let app know about us giving up if upstream is not expected to do so */
/* EOS is already taken care of elsewhere */
if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
- GST_ELEMENT_ERROR (queue, STREAM, FAILED,
- (_("Internal data flow error.")),
- ("streaming task paused, reason %s (%d)",
- gst_flow_get_name (ret), ret));
+ GST_ELEMENT_FLOW_ERROR (queue, ret);
gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
}
return;
switch (format) {
case GST_FORMAT_BYTES:
peer_pos -= queue->cur_level.bytes;
+ if (peer_pos < 0) /* Clamp result to 0 */
+ peer_pos = 0;
break;
case GST_FORMAT_TIME:
peer_pos -= queue->cur_level.time;
+ if (peer_pos < 0) /* Clamp result to 0 */
+ peer_pos = 0;
break;
default:
GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
GST_DEBUG_OBJECT (queue, "query buffering");
- get_buffering_percent (queue, &is_buffering, &percent);
+ get_buffering_level (queue, &is_buffering, &percent);
+ percent = convert_to_buffering_percent (queue, percent);
gst_query_set_buffering_percent (query, is_buffering, percent);
get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
+
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ gst_queue2_locked_flush (queue, FALSE, FALSE);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
}
return result;
queue->use_rate_estimate = g_value_get_boolean (value);
break;
case PROP_LOW_PERCENT:
- queue->low_percent = g_value_get_int (value);
+ queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
+ if (queue->is_buffering)
+ update_buffering (queue);
break;
case PROP_HIGH_PERCENT:
- queue->high_percent = g_value_get_int (value);
+ queue->high_watermark =
+ g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
+ if (queue->is_buffering)
+ update_buffering (queue);
+ break;
+ case PROP_LOW_WATERMARK:
+ queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
+ if (queue->is_buffering)
+ update_buffering (queue);
+ break;
+ case PROP_HIGH_WATERMARK:
+ queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
+ if (queue->is_buffering)
+ update_buffering (queue);
break;
case PROP_TEMP_TEMPLATE:
gst_queue2_set_temp_template (queue, g_value_get_string (value));
g_value_set_boolean (value, queue->use_rate_estimate);
break;
case PROP_LOW_PERCENT:
- g_value_set_int (value, queue->low_percent);
+ g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
break;
case PROP_HIGH_PERCENT:
- g_value_set_int (value, queue->high_percent);
+ g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
+ break;
+ case PROP_LOW_WATERMARK:
+ g_value_set_double (value, queue->low_watermark /
+ (gdouble) MAX_BUFFERING_LEVEL);
+ break;
+ case PROP_HIGH_WATERMARK:
+ g_value_set_double (value, queue->high_watermark /
+ (gdouble) MAX_BUFFERING_LEVEL);
break;
case PROP_TEMP_TEMPLATE:
g_value_set_string (value, queue->temp_template);