* the next buffer of buffer list */
gboolean pending_custom_segment;
+ /* the next buffer that will be queued needs a discont flag
+ * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
+ gboolean need_discont_upstream;
+ /* the next buffer that will be dequeued needs a discont flag
+ * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
+ gboolean need_discont_downstream;
+
gint64 size;
GstClockTime duration;
GstAppStreamType stream_type;
guint min_percent;
gboolean handle_segment_change;
+ GstAppLeakyType leaky_type;
+
Callbacks *callbacks;
};
#define DEFAULT_PROP_CURRENT_LEVEL_TIME 0
#define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
#define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
+#define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE
enum
{
PROP_CURRENT_LEVEL_TIME,
PROP_DURATION,
PROP_HANDLE_SEGMENT_CHANGE,
+ PROP_LEAKY_TYPE,
PROP_LAST
};
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
+ /**
+ * GstAppSrc:leaky-type:
+ *
+ * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
+ * will drop any buffers that are pushed into it once its internal queue is
+ * full. The selected type defines whether to drop the oldest or new
+ * buffers.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
+ g_param_spec_enum ("leaky-type", "Leaky Type",
+ "Whether to drop buffers once the internal queue is full",
+ GST_TYPE_APP_LEAKY_TYPE,
+ DEFAULT_PROP_LEAKY_TYPE,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
+ G_PARAM_STATIC_STRINGS));
+
/**
* GstAppSrc::need-data:
* @appsrc: the appsrc element that emitted the signal
priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
+ priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
}
priv->queued_time = 0;
priv->last_in_running_time = GST_CLOCK_TIME_NONE;
priv->last_out_running_time = GST_CLOCK_TIME_NONE;
+ priv->need_discont_upstream = FALSE;
+ priv->need_discont_downstream = FALSE;
}
static void
case PROP_HANDLE_SEGMENT_CHANGE:
priv->handle_segment_change = g_value_get_boolean (value);
break;
+ case PROP_LEAKY_TYPE:
+ priv->leaky_type = g_value_get_enum (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_HANDLE_SEGMENT_CHANGE:
g_value_set_boolean (value, priv->handle_segment_change);
break;
+ case PROP_LEAKY_TYPE:
+ g_value_set_enum (value, priv->leaky_type);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
if (GST_IS_BUFFER (obj)) {
- *buf = GST_BUFFER (obj);
+ GstBuffer *buffer = GST_BUFFER (obj);
+
+ /* Mark the buffer as DISCONT if we previously dropped a buffer
+ * instead of outputting it */
+ if (priv->need_discont_downstream) {
+ buffer = gst_buffer_make_writable (buffer);
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ priv->need_discont_downstream = FALSE;
+ }
+
+ *buf = buffer;
} else if (GST_IS_BUFFER_LIST (obj)) {
GstBufferList *buffer_list;
buffer_list = GST_BUFFER_LIST (obj);
+ /* Mark the first buffer of the buffer list as DISCONT if we
+ * previously dropped a buffer instead of outputting it */
+ if (priv->need_discont_downstream) {
+ GstBuffer *buffer;
+
+ buffer_list = gst_buffer_list_make_writable (buffer_list);
+ buffer = gst_buffer_list_get_writable (buffer_list, 0);
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ priv->need_discont_downstream = FALSE;
+ }
+
gst_base_src_submit_buffer_list (bsrc, buffer_list);
*buf = NULL;
} else if (GST_IS_EVENT (obj)) {
}
}
+/**
+ * gst_app_src_set_leaky_type:
+ * @appsrc: a #GstAppSrc
+ * @leaky: the #GstAppLeakyType
+ *
+ * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
+ * will drop any buffers that are pushed into it once its internal queue is
+ * full. The selected type defines whether to drop the oldest or new
+ * buffers.
+ *
+ * Since: 1.20
+ */
+void
+gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
+{
+ g_return_if_fail (GST_IS_APP_SRC (appsrc));
+
+ appsrc->priv->leaky_type = leaky;
+}
+
+/**
+ * gst_app_src_get_leaky_type:
+ * @appsrc: a #GstAppSrc
+ *
+ * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
+ * for more details.
+ *
+ * Returns: The currently set #GstAppLeakyType.
+ *
+ * Since: 1.20
+ */
+GstAppLeakyType
+gst_app_src_get_leaky_type (GstAppSrc * appsrc)
+{
+ g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);
+
+ return appsrc->priv->leaky_type;
+}
+
/**
* gst_app_src_set_latency:
* @appsrc: a #GstAppSrc
priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
GST_TIME_ARGS (priv->max_time));
+ if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
+ priv->need_discont_upstream = TRUE;
+ goto dropped;
+ } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
+ guint i, length = gst_queue_array_get_length (priv->queue);
+ GstMiniObject *item = NULL;
+
+ /* Find the oldest buffer or buffer list and drop it, then update the
+ * limits. Dropping one is sufficient to go below the limits again.
+ */
+ for (i = 0; i < length; i++) {
+ item = gst_queue_array_peek_nth (priv->queue, i);
+ if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
+ gst_queue_array_drop_element (priv->queue, i);
+ break;
+ }
+ /* To not accidentally have an event after the loop */
+ item = NULL;
+ }
+
+ if (!item) {
+ GST_FIXME_OBJECT (appsrc,
+ "No buffer or buffer list queued but queue is full");
+ /* This shouldn't really happen but in this case we can't really do
+ * anything apart from accepting the buffer / bufferlist */
+ break;
+ }
+
+ GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
+
+ gst_app_src_update_queued_pop (appsrc, item, FALSE);
+ gst_mini_object_unref (item);
+
+ priv->need_discont_downstream = TRUE;
+ continue;
+ }
+
if (first) {
Callbacks *callbacks = NULL;
gboolean emit;
* stops pushing buffers. */
break;
}
- } else
+ } else {
break;
+ }
}
if (priv->pending_custom_segment) {
}
if (buflist != NULL) {
+ /* Mark the first buffer of the buffer list as DISCONT if we previously
+ * dropped a buffer instead of queueing it */
+ if (priv->need_discont_upstream) {
+ if (!steal_ref) {
+ buflist = gst_buffer_list_copy (buflist);
+ steal_ref = TRUE;
+ } else {
+ buflist = gst_buffer_list_make_writable (buflist);
+ }
+ buffer = gst_buffer_list_get_writable (buflist, 0);
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ priv->need_discont_upstream = FALSE;
+ }
+
GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
+
if (!steal_ref)
gst_buffer_list_ref (buflist);
gst_queue_array_push_tail (priv->queue, buflist);
} else {
+ /* Mark the buffer as DISCONT if we previously dropped a buffer instead of
+ * queueing it */
+ if (priv->need_discont_upstream) {
+ if (!steal_ref) {
+ buffer = gst_buffer_copy (buffer);
+ steal_ref = TRUE;
+ } else {
+ buffer = gst_buffer_make_writable (buffer);
+ }
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ priv->need_discont_upstream = FALSE;
+ }
+
GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
if (!steal_ref)
gst_buffer_ref (buffer);
g_mutex_unlock (&priv->mutex);
return GST_FLOW_EOS;
}
+dropped:
+ {
+ GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
+ if (steal_ref) {
+ if (buflist)
+ gst_buffer_list_unref (buflist);
+ else
+ gst_buffer_unref (buffer);
+ }
+ g_mutex_unlock (&priv->mutex);
+ return GST_FLOW_EOS;
+ }
}
static GstFlowReturn