* streaming thread. It is important to note that data transport will not happen
* from the thread that performed the push-buffer call.
*
- * The "max-bytes" property controls how much data can be queued in appsrc
- * before appsrc considers the queue full. A filled internal queue will always
- * signal the "enough-data" signal, which signals the application that it should
- * stop pushing data into appsrc. The "block" property will cause appsrc to
- * block the push-buffer method until free data becomes available again.
+ * The "max-bytes", "max-buffers" and "max-time" properties control how much
+ * data can be queued in appsrc before appsrc considers the queue full. A
+ * filled internal queue will always signal the "enough-data" signal, which
+ * signals the application that it should stop pushing data into appsrc. The
+ * "block" property will cause appsrc to block the push-buffer method until
+ * free data becomes available again.
*
* When the internal queue is running out of data, the "need-data" signal is
* emitted, which signals the application that it should start pushing more data
GstCaps *last_caps;
GstCaps *current_caps;
+ /* last segment received on the input */
GstSegment last_segment;
+ /* currently configured segment for the output */
GstSegment current_segment;
+ /* queue up a segment event based on last_segment before
+ * the next buffer of buffer list */
gboolean pending_custom_segment;
gint64 size;
GstClockTime duration;
GstAppStreamType stream_type;
- guint64 max_bytes;
+ guint64 max_bytes, max_buffers, max_time;
GstFormat format;
gboolean block;
gchar *uri;
gboolean flushing;
gboolean started;
gboolean is_eos;
- guint64 queued_bytes;
+ guint64 queued_bytes, queued_buffers;
+ /* Used to calculate the current time level */
+ GstClockTime last_in_running_time, last_out_running_time;
+ /* Updated based on the above whenever they change */
+ GstClockTime queued_time;
guint64 offset;
GstAppStreamType current_type;
#define DEFAULT_PROP_SIZE -1
#define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
#define DEFAULT_PROP_MAX_BYTES 200000
+#define DEFAULT_PROP_MAX_BUFFERS 0
+#define DEFAULT_PROP_MAX_TIME (0 * GST_SECOND)
#define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
#define DEFAULT_PROP_BLOCK FALSE
#define DEFAULT_PROP_IS_LIVE FALSE
#define DEFAULT_PROP_EMIT_SIGNALS TRUE
#define DEFAULT_PROP_MIN_PERCENT 0
#define DEFAULT_PROP_CURRENT_LEVEL_BYTES 0
+#define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
+#define DEFAULT_PROP_CURRENT_LEVEL_TIME 0
#define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
#define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
PROP_SIZE,
PROP_STREAM_TYPE,
PROP_MAX_BYTES,
+ PROP_MAX_BUFFERS,
+ PROP_MAX_TIME,
PROP_FORMAT,
PROP_BLOCK,
PROP_IS_LIVE,
PROP_EMIT_SIGNALS,
PROP_MIN_PERCENT,
PROP_CURRENT_LEVEL_BYTES,
+ PROP_CURRENT_LEVEL_BUFFERS,
+ PROP_CURRENT_LEVEL_TIME,
PROP_DURATION,
PROP_HANDLE_SEGMENT_CHANGE,
PROP_LAST
"The maximum number of bytes to queue internally (0 = unlimited)",
0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstAppSrc:max-buffers:
+ *
+ * The maximum amount of buffers that can be queued internally.
+ * After the maximum amount of buffers are queued, appsrc will emit the
+ * "enough-data" signal.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
+ g_param_spec_uint64 ("max-buffers", "Max buffers",
+ "The maximum number of buffers to queue internally (0 = unlimited)",
+ 0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstAppSrc:max-time:
+ *
+ * The maximum amount of time that can be queued internally.
+ * After the maximum amount of time are queued, appsrc will emit the
+ * "enough-data" signal.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property (gobject_class, PROP_MAX_TIME,
+ g_param_spec_uint64 ("max-time", "Max time",
+ "The maximum amount of time to queue internally (0 = unlimited)",
+ 0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
/**
* GstAppSrc:block:
*
0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstAppSrc:current-level-buffers:
+ *
+ * The number of currently queued buffers inside appsrc.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
+ g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
+ "The number of currently queued buffers",
+ 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstAppSrc:current-level-time:
+ *
+ * The amount of currently queued time inside appsrc.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
+ g_param_spec_uint64 ("current-level-time", "Current Level Time",
+ "The amount of currently queued time",
+ 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
/**
* GstAppSrc:duration:
*
priv->duration = DEFAULT_PROP_DURATION;
priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
+ priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
+ priv->max_time = DEFAULT_PROP_MAX_TIME;
priv->format = DEFAULT_PROP_FORMAT;
priv->block = DEFAULT_PROP_BLOCK;
priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
}
priv->queued_bytes = 0;
+ priv->queued_buffers = 0;
+ priv->queued_time = 0;
+ priv->last_in_running_time = GST_CLOCK_TIME_NONE;
+ priv->last_out_running_time = GST_CLOCK_TIME_NONE;
}
static void
case PROP_MAX_BYTES:
gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
break;
+ case PROP_MAX_BUFFERS:
+ gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
+ break;
+ case PROP_MAX_TIME:
+ gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
+ break;
case PROP_FORMAT:
priv->format = g_value_get_enum (value);
break;
case PROP_MAX_BYTES:
g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
break;
+ case PROP_MAX_BUFFERS:
+ g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
+ break;
+ case PROP_MAX_TIME:
+ g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
+ break;
case PROP_FORMAT:
g_value_set_enum (value, priv->format);
break;
case PROP_CURRENT_LEVEL_BYTES:
g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
break;
+ case PROP_CURRENT_LEVEL_BUFFERS:
+ g_value_set_uint64 (value,
+ gst_app_src_get_current_level_buffers (appsrc));
+ break;
+ case PROP_CURRENT_LEVEL_TIME:
+ g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
+ break;
case PROP_DURATION:
g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
break;
return result;
}
+/* Update the currently queued bytes/buffers/time information for the item
+ * that was just removed from the queue.
+ *
+ * If update_offset is set, additionally the offset of the source will be
+ * moved forward accordingly as if that many bytes were output.
+ */
+static void
+gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
+ gboolean update_offset)
+{
+ GstAppSrcPrivate *priv = appsrc->priv;
+ guint buf_size = 0;
+ guint n_buffers = 0;
+ GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
+
+ if (GST_IS_BUFFER (item)) {
+ GstBuffer *buf = GST_BUFFER_CAST (item);
+ buf_size = gst_buffer_get_size (buf);
+ n_buffers = 1;
+
+ end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
+ if (end_buffer_ts != GST_CLOCK_TIME_NONE
+ && GST_BUFFER_DURATION_IS_VALID (buf))
+ end_buffer_ts += GST_BUFFER_DURATION (buf);
+
+ GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
+ } else if (GST_IS_BUFFER_LIST (item)) {
+ GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+ guint i;
+
+ n_buffers = gst_buffer_list_length (buffer_list);
+
+ for (i = 0; i < n_buffers; i++) {
+ GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
+ GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
+
+ buf_size += gst_buffer_get_size (tmp);
+ /* Update to the last buffer's timestamp that is known */
+ if (ts != GST_CLOCK_TIME_NONE) {
+ end_buffer_ts = ts;
+ if (GST_BUFFER_DURATION_IS_VALID (tmp))
+ end_buffer_ts += GST_BUFFER_DURATION (tmp);
+ }
+ }
+ }
+
+ priv->queued_bytes -= buf_size;
+ priv->queued_buffers -= n_buffers;
+
+ /* Update time level if working on a TIME segment */
+ if (priv->current_segment.format == GST_FORMAT_TIME
+ && end_buffer_ts != GST_CLOCK_TIME_NONE) {
+ /* Clip to the current segment boundaries */
+ if (priv->current_segment.stop != -1
+ && end_buffer_ts > priv->current_segment.stop)
+ end_buffer_ts = priv->current_segment.stop;
+ else if (priv->current_segment.start > end_buffer_ts)
+ end_buffer_ts = priv->current_segment.start;
+
+ priv->last_out_running_time =
+ gst_segment_to_running_time (&priv->current_segment,
+ GST_FORMAT_TIME, end_buffer_ts);
+
+ GST_TRACE_OBJECT (appsrc,
+ "Last in running time %" GST_TIME_FORMAT ", last out running time %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
+ GST_TIME_ARGS (priv->last_out_running_time));
+
+ /* If timestamps on both sides are known, calculate the current
+ * fill level in time and consider the queue empty if the output
+ * running time is lower than the input one (i.e. some kind of reset
+ * has happened).
+ */
+ if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
+ && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
+ if (priv->last_out_running_time > priv->last_in_running_time) {
+ priv->queued_time = 0;
+ } else {
+ priv->queued_time =
+ priv->last_in_running_time - priv->last_out_running_time;
+ }
+ }
+ }
+
+ GST_DEBUG_OBJECT (appsrc,
+ "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
+ " buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
+ priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));
+
+ /* only update the offset when in random_access mode and when requested by
+ * the caller, i.e. not when just dropping the item */
+ if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
+ priv->offset += buf_size;
+}
+
+/* Update the currently queued bytes/buffers/time information for the item
+ * that was just added to the queue.
+ */
+static void
+gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
+{
+ GstAppSrcPrivate *priv = appsrc->priv;
+ GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
+ GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
+ guint buf_size = 0;
+ guint n_buffers = 0;
+
+ if (GST_IS_BUFFER (item)) {
+ GstBuffer *buf = GST_BUFFER_CAST (item);
+
+ buf_size = gst_buffer_get_size (buf);
+ n_buffers = 1;
+
+ start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
+ if (end_buffer_ts != GST_CLOCK_TIME_NONE
+ && GST_BUFFER_DURATION_IS_VALID (buf))
+ end_buffer_ts += GST_BUFFER_DURATION (buf);
+ } else if (GST_IS_BUFFER_LIST (item)) {
+ GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+ guint i;
+
+ n_buffers = gst_buffer_list_length (buffer_list);
+
+ for (i = 0; i < n_buffers; i++) {
+ GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
+ GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
+
+ buf_size += gst_buffer_get_size (tmp);
+
+ if (ts != GST_CLOCK_TIME_NONE) {
+ if (start_buffer_ts == GST_CLOCK_TIME_NONE)
+ start_buffer_ts = ts;
+ end_buffer_ts = ts;
+ if (GST_BUFFER_DURATION_IS_VALID (tmp))
+ end_buffer_ts += GST_BUFFER_DURATION (tmp);
+ }
+ }
+ }
+
+ priv->queued_bytes += buf_size;
+ priv->queued_buffers += n_buffers;
+
+ /* Update time level if working on a TIME segment */
+ if (priv->last_segment.format == GST_FORMAT_TIME
+ && end_buffer_ts != GST_CLOCK_TIME_NONE) {
+ /* Clip to the last segment boundaries */
+ if (priv->last_segment.stop != -1
+ && end_buffer_ts > priv->last_segment.stop)
+ end_buffer_ts = priv->last_segment.stop;
+ else if (priv->last_segment.start > end_buffer_ts)
+ end_buffer_ts = priv->last_segment.start;
+
+ priv->last_in_running_time =
+ gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
+ end_buffer_ts);
+
+ /* If this is the only buffer then we can directly update the queued time
+ * here. This is especially useful if this was the first buffer because
+ * otherwise we would have to wait until it is actually unqueued to know
+ * the queued duration */
+ if (gst_queue_array_get_length (priv->queue) == 1) {
+ if (priv->last_segment.stop != -1
+ && start_buffer_ts > priv->last_segment.stop)
+ start_buffer_ts = priv->last_segment.stop;
+ else if (priv->last_segment.start > start_buffer_ts)
+ start_buffer_ts = priv->last_segment.start;
+
+ priv->last_out_running_time =
+ gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
+ start_buffer_ts);
+ }
+
+ GST_TRACE_OBJECT (appsrc,
+ "Last in running time %" GST_TIME_FORMAT ", last out running time %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
+ GST_TIME_ARGS (priv->last_out_running_time));
+
+ if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
+ && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
+ if (priv->last_out_running_time > priv->last_in_running_time) {
+ priv->queued_time = 0;
+ } else {
+ priv->queued_time =
+ priv->last_in_running_time - priv->last_out_running_time;
+ }
+ }
+ }
+
+ GST_DEBUG_OBJECT (appsrc,
+ "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
+ " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
+ GST_TIME_ARGS (priv->queued_time));
+}
+
static GstFlowReturn
gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
GstBuffer ** buf)
while (TRUE) {
/* return data as long as we have some */
if (!gst_queue_array_is_empty (priv->queue)) {
- guint buf_size;
GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
if (GST_IS_CAPS (obj)) {
if (GST_IS_BUFFER (obj)) {
*buf = GST_BUFFER (obj);
- buf_size = gst_buffer_get_size (*buf);
- GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", *buf, buf_size);
} else if (GST_IS_BUFFER_LIST (obj)) {
GstBufferList *buffer_list;
buffer_list = GST_BUFFER_LIST (obj);
- buf_size = gst_buffer_list_calculate_size (buffer_list);
-
- GST_LOG_OBJECT (appsrc, "have buffer list %p of size %u, %u buffers",
- buffer_list, buf_size, gst_buffer_list_length (buffer_list));
-
gst_base_src_submit_buffer_list (bsrc, buffer_list);
*buf = NULL;
} else if (GST_IS_EVENT (obj)) {
g_assert_not_reached ();
}
- priv->queued_bytes -= buf_size;
-
- /* only update the offset when in random_access mode */
- if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
- priv->offset += buf_size;
+ gst_app_src_update_queued_pop (appsrc, obj, TRUE);
/* signal that we removed an item */
if ((priv->wait_status & APP_WAITING))
g_cond_broadcast (&priv->cond);
/* see if we go lower than the min-percent */
- if (priv->min_percent && priv->max_bytes) {
- if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
+ if (priv->min_percent) {
+ if ((priv->max_bytes
+ && priv->queued_bytes * 100 / priv->max_bytes <=
+ priv->min_percent) || (priv->max_buffers
+ && priv->queued_buffers * 100 / priv->max_buffers <=
+ priv->min_percent) || (priv->max_time
+ && priv->queued_time * 100 / priv->max_time <=
+ priv->min_percent)) {
/* ignore flushing state, we got a buffer and we will return it now.
* Errors will be handled in the next round */
gst_app_src_emit_need_data (appsrc, size);
+ }
}
ret = GST_FLOW_OK;
break;
guint64
gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
{
- gint64 queued;
+ guint64 queued;
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
return queued;
}
+/**
+ * gst_app_src_set_max_buffers:
+ * @appsrc: a #GstAppSrc
+ * @max: the maximum number of buffers to queue
+ *
+ * Set the maximum amount of buffers that can be queued in @appsrc.
+ * After the maximum amount of buffers are queued, @appsrc will emit the
+ * "enough-data" signal.
+ *
+ * Since: 1.20
+ */
+void
+gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
+{
+ GstAppSrcPrivate *priv;
+
+ g_return_if_fail (GST_IS_APP_SRC (appsrc));
+
+ priv = appsrc->priv;
+
+ g_mutex_lock (&priv->mutex);
+ if (max != priv->max_buffers) {
+ GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
+ priv->max_buffers = max;
+ /* signal the change */
+ g_cond_broadcast (&priv->cond);
+ }
+ g_mutex_unlock (&priv->mutex);
+}
+
+/**
+ * gst_app_src_get_max_buffers:
+ * @appsrc: a #GstAppSrc
+ *
+ * Get the maximum amount of buffers that can be queued in @appsrc.
+ *
+ * Returns: The maximum amount of buffers that can be queued.
+ *
+ * Since: 1.20
+ */
+guint64
+gst_app_src_get_max_buffers (GstAppSrc * appsrc)
+{
+ guint64 result;
+ GstAppSrcPrivate *priv;
+
+ g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
+
+ priv = appsrc->priv;
+
+ g_mutex_lock (&priv->mutex);
+ result = priv->max_buffers;
+ GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
+ result);
+ g_mutex_unlock (&priv->mutex);
+
+ return result;
+}
+
+/**
+ * gst_app_src_get_current_level_buffers:
+ * @appsrc: a #GstAppSrc
+ *
+ * Get the number of currently queued buffers inside @appsrc.
+ *
+ * Returns: The number of currently queued buffers.
+ *
+ * Since: 1.20
+ */
+guint64
+gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
+{
+ guint64 queued;
+ GstAppSrcPrivate *priv;
+
+ g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
+
+ priv = appsrc->priv;
+
+ GST_OBJECT_LOCK (appsrc);
+ queued = priv->queued_buffers;
+ GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
+ queued);
+ GST_OBJECT_UNLOCK (appsrc);
+
+ return queued;
+}
+
+/**
+ * gst_app_src_set_max_time:
+ * @appsrc: a #GstAppSrc
+ * @max: the maximum amonut of time to queue
+ *
+ * Set the maximum amount of time that can be queued in @appsrc.
+ * After the maximum amount of time are queued, @appsrc will emit the
+ * "enough-data" signal.
+ *
+ * Since: 1.20
+ */
+void
+gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
+{
+ GstAppSrcPrivate *priv;
+
+ g_return_if_fail (GST_IS_APP_SRC (appsrc));
+
+ priv = appsrc->priv;
+
+ g_mutex_lock (&priv->mutex);
+ if (max != priv->max_time) {
+ GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (max));
+ priv->max_time = max;
+ /* signal the change */
+ g_cond_broadcast (&priv->cond);
+ }
+ g_mutex_unlock (&priv->mutex);
+}
+
+/**
+ * gst_app_src_get_max_time:
+ * @appsrc: a #GstAppSrc
+ *
+ * Get the maximum amount of time that can be queued in @appsrc.
+ *
+ * Returns: The maximum amount of time that can be queued.
+ *
+ * Since: 1.20
+ */
+GstClockTime
+gst_app_src_get_max_time (GstAppSrc * appsrc)
+{
+ GstClockTime result;
+ GstAppSrcPrivate *priv;
+
+ g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
+
+ priv = appsrc->priv;
+
+ g_mutex_lock (&priv->mutex);
+ result = priv->max_time;
+ GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (result));
+ g_mutex_unlock (&priv->mutex);
+
+ return result;
+}
+
+/**
+ * gst_app_src_get_current_level_time:
+ * @appsrc: a #GstAppSrc
+ *
+ * Get the amount of currently queued time inside @appsrc.
+ *
+ * Returns: The amount of currently queued time.
+ *
+ * Since: 1.20
+ */
+GstClockTime
+gst_app_src_get_current_level_time (GstAppSrc * appsrc)
+{
+ gint64 queued;
+ GstAppSrcPrivate *priv;
+
+ g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
+
+ priv = appsrc->priv;
+
+ GST_OBJECT_LOCK (appsrc);
+ queued = priv->queued_time;
+ GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (queued));
+ GST_OBJECT_UNLOCK (appsrc);
+
+ return queued;
+}
+
static void
gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
gboolean do_max, guint64 max)
if (priv->is_eos)
goto eos;
- if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
+ if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
+ (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
+ (priv->max_time && priv->queued_time >= priv->max_time)) {
GST_DEBUG_OBJECT (appsrc,
- "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
- priv->queued_bytes, priv->max_bytes);
+ "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
+ G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
+ " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
+ GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
+ priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
+ priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
+ GST_TIME_ARGS (priv->max_time));
if (first) {
Callbacks *callbacks = NULL;
if (!steal_ref)
gst_buffer_list_ref (buflist);
gst_queue_array_push_tail (priv->queue, buflist);
- priv->queued_bytes += gst_buffer_list_calculate_size (buflist);
} else {
GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
if (!steal_ref)
gst_buffer_ref (buffer);
gst_queue_array_push_tail (priv->queue, buffer);
- priv->queued_bytes += gst_buffer_get_size (buffer);
}
+ gst_app_src_update_queued_push (appsrc,
+ buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));
+
if ((priv->wait_status & STREAM_WAITING))
g_cond_broadcast (&priv->cond);