appsrc: Add new max-buffers / max-time / current-level-buffers / current-level-time...
authorSebastian Dröge <sebastian@centricular.com>
Fri, 30 Apr 2021 16:22:46 +0000 (19:22 +0300)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 5 May 2021 15:13:33 +0000 (15:13 +0000)
These work the same way as the corresponding properties on queue and
allow to control the internal buffer size of the appsrc in a more
flexible way.

Unlike in queue the max-buffers and max-time properties are 0 (i.e.
disabled) by default for backwards compatibility reasons.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1133>

docs/plugins/gst_plugins_cache.json
gst-libs/gst/app/gstappsrc.c
gst-libs/gst/app/gstappsrc.h

index 285dc3b712b75270862bd4680abb0eb0d3b30fe2..035331058125ffca4f6d5ad5a721ff83fc378c8b 100644 (file)
                         "type": "GstCaps",
                         "writable": true
                     },
+                    "current-level-buffers": {
+                        "blurb": "The number of currently queued buffers",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "0",
+                        "max": "18446744073709551615",
+                        "min": "0",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "guint64",
+                        "writable": false
+                    },
                     "current-level-bytes": {
                         "blurb": "The number of currently queued bytes",
                         "conditionally-available": false,
                         "type": "guint64",
                         "writable": false
                     },
+                    "current-level-time": {
+                        "blurb": "The amount of currently queued time",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "0",
+                        "max": "18446744073709551615",
+                        "min": "0",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "guint64",
+                        "writable": false
+                    },
                     "duration": {
                         "blurb": "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
                         "conditionally-available": false,
                         "type": "gboolean",
                         "writable": true
                     },
+                    "max-buffers": {
+                        "blurb": "The maximum number of buffers to queue internally (0 = unlimited)",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "0",
+                        "max": "18446744073709551615",
+                        "min": "0",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "guint64",
+                        "writable": true
+                    },
                     "max-bytes": {
                         "blurb": "The maximum number of bytes to queue internally (0 = unlimited)",
                         "conditionally-available": false,
                         "type": "gint64",
                         "writable": true
                     },
+                    "max-time": {
+                        "blurb": "The maximum amount of time to queue internally (0 = unlimited)",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "0",
+                        "max": "18446744073709551615",
+                        "min": "0",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "guint64",
+                        "writable": true
+                    },
                     "min-latency": {
                         "blurb": "The minimum latency (-1 = default)",
                         "conditionally-available": false,
index c2267a045a70bb8a099bf335e3bf70ebe19b311f..73d92211ba87607bbea2b51516d306bc75fff519 100644 (file)
  * streaming thread. It is important to note that data transport will not happen
  * from the thread that performed the push-buffer call.
  *
- * The "max-bytes" property controls how much data can be queued in appsrc
- * before appsrc considers the queue full. A filled internal queue will always
- * signal the "enough-data" signal, which signals the application that it should
- * stop pushing data into appsrc. The "block" property will cause appsrc to
- * block the push-buffer method until free data becomes available again.
+ * The "max-bytes", "max-buffers" and "max-time" properties control how much
+ * data can be queued in appsrc before appsrc considers the queue full. A
+ * filled internal queue will always signal the "enough-data" signal, which
+ * signals the application that it should stop pushing data into appsrc. The
+ * "block" property will cause appsrc to block the push-buffer method until
+ * free data becomes available again.
  *
  * When the internal queue is running out of data, the "need-data" signal is
  * emitted, which signals the application that it should start pushing more data
@@ -146,14 +147,18 @@ struct _GstAppSrcPrivate
 
   GstCaps *last_caps;
   GstCaps *current_caps;
+  /* last segment received on the input */
   GstSegment last_segment;
+  /* currently configured segment for the output */
   GstSegment current_segment;
+  /* queue up a segment event based on last_segment before
+   * the next buffer of buffer list */
   gboolean pending_custom_segment;
 
   gint64 size;
   GstClockTime duration;
   GstAppStreamType stream_type;
-  guint64 max_bytes;
+  guint64 max_bytes, max_buffers, max_time;
   GstFormat format;
   gboolean block;
   gchar *uri;
@@ -161,7 +166,11 @@ struct _GstAppSrcPrivate
   gboolean flushing;
   gboolean started;
   gboolean is_eos;
-  guint64 queued_bytes;
+  guint64 queued_bytes, queued_buffers;
+  /* Used to calculate the current time level */
+  GstClockTime last_in_running_time, last_out_running_time;
+  /* Updated based on the above whenever they change */
+  GstClockTime queued_time;
   guint64 offset;
   GstAppStreamType current_type;
 
@@ -196,6 +205,8 @@ enum
 #define DEFAULT_PROP_SIZE          -1
 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
 #define DEFAULT_PROP_MAX_BYTES     200000
+#define DEFAULT_PROP_MAX_BUFFERS   0
+#define DEFAULT_PROP_MAX_TIME      (0 * GST_SECOND)
 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
 #define DEFAULT_PROP_BLOCK         FALSE
 #define DEFAULT_PROP_IS_LIVE       FALSE
@@ -204,6 +215,8 @@ enum
 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
 #define DEFAULT_PROP_MIN_PERCENT   0
 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES   0
+#define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
+#define DEFAULT_PROP_CURRENT_LEVEL_TIME    0
 #define DEFAULT_PROP_DURATION      GST_CLOCK_TIME_NONE
 #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
 
@@ -214,6 +227,8 @@ enum
   PROP_SIZE,
   PROP_STREAM_TYPE,
   PROP_MAX_BYTES,
+  PROP_MAX_BUFFERS,
+  PROP_MAX_TIME,
   PROP_FORMAT,
   PROP_BLOCK,
   PROP_IS_LIVE,
@@ -222,6 +237,8 @@ enum
   PROP_EMIT_SIGNALS,
   PROP_MIN_PERCENT,
   PROP_CURRENT_LEVEL_BYTES,
+  PROP_CURRENT_LEVEL_BUFFERS,
+  PROP_CURRENT_LEVEL_TIME,
   PROP_DURATION,
   PROP_HANDLE_SEGMENT_CHANGE,
   PROP_LAST
@@ -347,6 +364,37 @@ gst_app_src_class_init (GstAppSrcClass * klass)
           "The maximum number of bytes to queue internally (0 = unlimited)",
           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstAppSrc:max-buffers:
+   *
+   * The maximum amount of buffers that can be queued internally.
+   * After the maximum amount of buffers are queued, appsrc will emit the
+   * "enough-data" signal.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
+      g_param_spec_uint64 ("max-buffers", "Max buffers",
+          "The maximum number of buffers to queue internally (0 = unlimited)",
+          0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstAppSrc:max-time:
+   *
+   * The maximum amount of time that can be queued internally.
+   * After the maximum amount of time are queued, appsrc will emit the
+   * "enough-data" signal.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property (gobject_class, PROP_MAX_TIME,
+      g_param_spec_uint64 ("max-time", "Max time",
+          "The maximum amount of time to queue internally (0 = unlimited)",
+          0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   /**
    * GstAppSrc:block:
    *
@@ -430,6 +478,32 @@ gst_app_src_class_init (GstAppSrcClass * klass)
           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstAppSrc:current-level-buffers:
+   *
+   * The number of currently queued buffers inside appsrc.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
+      g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
+          "The number of currently queued buffers",
+          0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstAppSrc:current-level-time:
+   *
+   * The amount of currently queued time inside appsrc.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
+      g_param_spec_uint64 ("current-level-time", "Current Level Time",
+          "The amount of currently queued time",
+          0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
   /**
    * GstAppSrc:duration:
    *
@@ -636,6 +710,8 @@ gst_app_src_init (GstAppSrc * appsrc)
   priv->duration = DEFAULT_PROP_DURATION;
   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
+  priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
+  priv->max_time = DEFAULT_PROP_MAX_TIME;
   priv->format = DEFAULT_PROP_FORMAT;
   priv->block = DEFAULT_PROP_BLOCK;
   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
@@ -670,6 +746,10 @@ gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
   }
 
   priv->queued_bytes = 0;
+  priv->queued_buffers = 0;
+  priv->queued_time = 0;
+  priv->last_in_running_time = GST_CLOCK_TIME_NONE;
+  priv->last_out_running_time = GST_CLOCK_TIME_NONE;
 }
 
 static void
@@ -762,6 +842,12 @@ gst_app_src_set_property (GObject * object, guint prop_id,
     case PROP_MAX_BYTES:
       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
       break;
+    case PROP_MAX_BUFFERS:
+      gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
+      break;
+    case PROP_MAX_TIME:
+      gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
+      break;
     case PROP_FORMAT:
       priv->format = g_value_get_enum (value);
       break;
@@ -818,6 +904,12 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_MAX_BYTES:
       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
       break;
+    case PROP_MAX_BUFFERS:
+      g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
+      break;
+    case PROP_MAX_TIME:
+      g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
+      break;
     case PROP_FORMAT:
       g_value_set_enum (value, priv->format);
       break;
@@ -852,6 +944,13 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_CURRENT_LEVEL_BYTES:
       g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
       break;
+    case PROP_CURRENT_LEVEL_BUFFERS:
+      g_value_set_uint64 (value,
+          gst_app_src_get_current_level_buffers (appsrc));
+      break;
+    case PROP_CURRENT_LEVEL_TIME:
+      g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
+      break;
     case PROP_DURATION:
       g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
       break;
@@ -1204,6 +1303,200 @@ gst_app_src_negotiate (GstBaseSrc * basesrc)
   return result;
 }
 
+/* Update the currently queued bytes/buffers/time information for the item
+ * that was just removed from the queue.
+ *
+ * If update_offset is set, additionally the offset of the source will be
+ * moved forward accordingly as if that many bytes were output.
+ */
+static void
+gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
+    gboolean update_offset)
+{
+  GstAppSrcPrivate *priv = appsrc->priv;
+  guint buf_size = 0;
+  guint n_buffers = 0;
+  GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
+
+  if (GST_IS_BUFFER (item)) {
+    GstBuffer *buf = GST_BUFFER_CAST (item);
+    buf_size = gst_buffer_get_size (buf);
+    n_buffers = 1;
+
+    end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
+    if (end_buffer_ts != GST_CLOCK_TIME_NONE
+        && GST_BUFFER_DURATION_IS_VALID (buf))
+      end_buffer_ts += GST_BUFFER_DURATION (buf);
+
+    GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+    guint i;
+
+    n_buffers = gst_buffer_list_length (buffer_list);
+
+    for (i = 0; i < n_buffers; i++) {
+      GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
+      GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
+
+      buf_size += gst_buffer_get_size (tmp);
+      /* Update to the last buffer's timestamp that is known */
+      if (ts != GST_CLOCK_TIME_NONE) {
+        end_buffer_ts = ts;
+        if (GST_BUFFER_DURATION_IS_VALID (tmp))
+          end_buffer_ts += GST_BUFFER_DURATION (tmp);
+      }
+    }
+  }
+
+  priv->queued_bytes -= buf_size;
+  priv->queued_buffers -= n_buffers;
+
+  /* Update time level if working on a TIME segment */
+  if (priv->current_segment.format == GST_FORMAT_TIME
+      && end_buffer_ts != GST_CLOCK_TIME_NONE) {
+    /* Clip to the current segment boundaries */
+    if (priv->current_segment.stop != -1
+        && end_buffer_ts > priv->current_segment.stop)
+      end_buffer_ts = priv->current_segment.stop;
+    else if (priv->current_segment.start > end_buffer_ts)
+      end_buffer_ts = priv->current_segment.start;
+
+    priv->last_out_running_time =
+        gst_segment_to_running_time (&priv->current_segment,
+        GST_FORMAT_TIME, end_buffer_ts);
+
+    GST_TRACE_OBJECT (appsrc,
+        "Last in running time %" GST_TIME_FORMAT ", last out running time %"
+        GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
+        GST_TIME_ARGS (priv->last_out_running_time));
+
+    /* If timestamps on both sides are known, calculate the current
+     * fill level in time and consider the queue empty if the output
+     * running time is lower than the input one (i.e. some kind of reset
+     * has happened).
+     */
+    if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
+        && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
+      if (priv->last_out_running_time > priv->last_in_running_time) {
+        priv->queued_time = 0;
+      } else {
+        priv->queued_time =
+            priv->last_in_running_time - priv->last_out_running_time;
+      }
+    }
+  }
+
+  GST_DEBUG_OBJECT (appsrc,
+      "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
+      " buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
+      priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));
+
+  /* only update the offset when in random_access mode and when requested by
+   * the caller, i.e. not when just dropping the item */
+  if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
+    priv->offset += buf_size;
+}
+
+/* Update the currently queued bytes/buffers/time information for the item
+ * that was just added to the queue.
+ */
+static void
+gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
+{
+  GstAppSrcPrivate *priv = appsrc->priv;
+  GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
+  GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
+  guint buf_size = 0;
+  guint n_buffers = 0;
+
+  if (GST_IS_BUFFER (item)) {
+    GstBuffer *buf = GST_BUFFER_CAST (item);
+
+    buf_size = gst_buffer_get_size (buf);
+    n_buffers = 1;
+
+    start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
+    if (end_buffer_ts != GST_CLOCK_TIME_NONE
+        && GST_BUFFER_DURATION_IS_VALID (buf))
+      end_buffer_ts += GST_BUFFER_DURATION (buf);
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+    guint i;
+
+    n_buffers = gst_buffer_list_length (buffer_list);
+
+    for (i = 0; i < n_buffers; i++) {
+      GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
+      GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
+
+      buf_size += gst_buffer_get_size (tmp);
+
+      if (ts != GST_CLOCK_TIME_NONE) {
+        if (start_buffer_ts == GST_CLOCK_TIME_NONE)
+          start_buffer_ts = ts;
+        end_buffer_ts = ts;
+        if (GST_BUFFER_DURATION_IS_VALID (tmp))
+          end_buffer_ts += GST_BUFFER_DURATION (tmp);
+      }
+    }
+  }
+
+  priv->queued_bytes += buf_size;
+  priv->queued_buffers += n_buffers;
+
+  /* Update time level if working on a TIME segment */
+  if (priv->last_segment.format == GST_FORMAT_TIME
+      && end_buffer_ts != GST_CLOCK_TIME_NONE) {
+    /* Clip to the last segment boundaries */
+    if (priv->last_segment.stop != -1
+        && end_buffer_ts > priv->last_segment.stop)
+      end_buffer_ts = priv->last_segment.stop;
+    else if (priv->last_segment.start > end_buffer_ts)
+      end_buffer_ts = priv->last_segment.start;
+
+    priv->last_in_running_time =
+        gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
+        end_buffer_ts);
+
+    /* If this is the only buffer then we can directly update the queued time
+     * here. This is especially useful if this was the first buffer because
+     * otherwise we would have to wait until it is actually unqueued to know
+     * the queued duration */
+    if (gst_queue_array_get_length (priv->queue) == 1) {
+      if (priv->last_segment.stop != -1
+          && start_buffer_ts > priv->last_segment.stop)
+        start_buffer_ts = priv->last_segment.stop;
+      else if (priv->last_segment.start > start_buffer_ts)
+        start_buffer_ts = priv->last_segment.start;
+
+      priv->last_out_running_time =
+          gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
+          start_buffer_ts);
+    }
+
+    GST_TRACE_OBJECT (appsrc,
+        "Last in running time %" GST_TIME_FORMAT ", last out running time %"
+        GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
+        GST_TIME_ARGS (priv->last_out_running_time));
+
+    if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
+        && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
+      if (priv->last_out_running_time > priv->last_in_running_time) {
+        priv->queued_time = 0;
+      } else {
+        priv->queued_time =
+            priv->last_in_running_time - priv->last_out_running_time;
+      }
+    }
+  }
+
+  GST_DEBUG_OBJECT (appsrc,
+      "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
+      " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
+      GST_TIME_ARGS (priv->queued_time));
+}
+
 static GstFlowReturn
 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
     GstBuffer ** buf)
@@ -1263,7 +1556,6 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
   while (TRUE) {
     /* return data as long as we have some */
     if (!gst_queue_array_is_empty (priv->queue)) {
-      guint buf_size;
       GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
 
       if (GST_IS_CAPS (obj)) {
@@ -1297,18 +1589,11 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
 
       if (GST_IS_BUFFER (obj)) {
         *buf = GST_BUFFER (obj);
-        buf_size = gst_buffer_get_size (*buf);
-        GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", *buf, buf_size);
       } else if (GST_IS_BUFFER_LIST (obj)) {
         GstBufferList *buffer_list;
 
         buffer_list = GST_BUFFER_LIST (obj);
 
-        buf_size = gst_buffer_list_calculate_size (buffer_list);
-
-        GST_LOG_OBJECT (appsrc, "have buffer list %p of size %u, %u buffers",
-            buffer_list, buf_size, gst_buffer_list_length (buffer_list));
-
         gst_base_src_submit_buffer_list (bsrc, buffer_list);
         *buf = NULL;
       } else if (GST_IS_EVENT (obj)) {
@@ -1336,22 +1621,25 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
         g_assert_not_reached ();
       }
 
-      priv->queued_bytes -= buf_size;
-
-      /* only update the offset when in random_access mode */
-      if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
-        priv->offset += buf_size;
+      gst_app_src_update_queued_pop (appsrc, obj, TRUE);
 
       /* signal that we removed an item */
       if ((priv->wait_status & APP_WAITING))
         g_cond_broadcast (&priv->cond);
 
       /* see if we go lower than the min-percent */
-      if (priv->min_percent && priv->max_bytes) {
-        if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
+      if (priv->min_percent) {
+        if ((priv->max_bytes
+                && priv->queued_bytes * 100 / priv->max_bytes <=
+                priv->min_percent) || (priv->max_buffers
+                && priv->queued_buffers * 100 / priv->max_buffers <=
+                priv->min_percent) || (priv->max_time
+                && priv->queued_time * 100 / priv->max_time <=
+                priv->min_percent)) {
           /* ignore flushing state, we got a buffer and we will return it now.
            * Errors will be handled in the next round */
           gst_app_src_emit_need_data (appsrc, size);
+        }
       }
       ret = GST_FLOW_OK;
       break;
@@ -1717,7 +2005,7 @@ gst_app_src_get_max_bytes (GstAppSrc * appsrc)
 guint64
 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
 {
-  gint64 queued;
+  guint64 queued;
   GstAppSrcPrivate *priv;
 
   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
@@ -1733,6 +2021,183 @@ gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
   return queued;
 }
 
+/**
+ * gst_app_src_set_max_buffers:
+ * @appsrc: a #GstAppSrc
+ * @max: the maximum number of buffers to queue
+ *
+ * Set the maximum amount of buffers that can be queued in @appsrc.
+ * After the maximum amount of buffers are queued, @appsrc will emit the
+ * "enough-data" signal.
+ *
+ * Since: 1.20
+ */
+void
+gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
+{
+  GstAppSrcPrivate *priv;
+
+  g_return_if_fail (GST_IS_APP_SRC (appsrc));
+
+  priv = appsrc->priv;
+
+  g_mutex_lock (&priv->mutex);
+  if (max != priv->max_buffers) {
+    GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
+    priv->max_buffers = max;
+    /* signal the change */
+    g_cond_broadcast (&priv->cond);
+  }
+  g_mutex_unlock (&priv->mutex);
+}
+
+/**
+ * gst_app_src_get_max_buffers:
+ * @appsrc: a #GstAppSrc
+ *
+ * Get the maximum amount of buffers that can be queued in @appsrc.
+ *
+ * Returns: The maximum amount of buffers that can be queued.
+ *
+ * Since: 1.20
+ */
+guint64
+gst_app_src_get_max_buffers (GstAppSrc * appsrc)
+{
+  guint64 result;
+  GstAppSrcPrivate *priv;
+
+  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
+
+  priv = appsrc->priv;
+
+  g_mutex_lock (&priv->mutex);
+  result = priv->max_buffers;
+  GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
+      result);
+  g_mutex_unlock (&priv->mutex);
+
+  return result;
+}
+
+/**
+ * gst_app_src_get_current_level_buffers:
+ * @appsrc: a #GstAppSrc
+ *
+ * Get the number of currently queued buffers inside @appsrc.
+ *
+ * Returns: The number of currently queued buffers.
+ *
+ * Since: 1.20
+ */
+guint64
+gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
+{
+  guint64 queued;
+  GstAppSrcPrivate *priv;
+
+  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
+
+  priv = appsrc->priv;
+
+  GST_OBJECT_LOCK (appsrc);
+  queued = priv->queued_buffers;
+  GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
+      queued);
+  GST_OBJECT_UNLOCK (appsrc);
+
+  return queued;
+}
+
+/**
+ * gst_app_src_set_max_time:
+ * @appsrc: a #GstAppSrc
+ * @max: the maximum amonut of time to queue
+ *
+ * Set the maximum amount of time that can be queued in @appsrc.
+ * After the maximum amount of time are queued, @appsrc will emit the
+ * "enough-data" signal.
+ *
+ * Since: 1.20
+ */
+void
+gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
+{
+  GstAppSrcPrivate *priv;
+
+  g_return_if_fail (GST_IS_APP_SRC (appsrc));
+
+  priv = appsrc->priv;
+
+  g_mutex_lock (&priv->mutex);
+  if (max != priv->max_time) {
+    GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (max));
+    priv->max_time = max;
+    /* signal the change */
+    g_cond_broadcast (&priv->cond);
+  }
+  g_mutex_unlock (&priv->mutex);
+}
+
+/**
+ * gst_app_src_get_max_time:
+ * @appsrc: a #GstAppSrc
+ *
+ * Get the maximum amount of time that can be queued in @appsrc.
+ *
+ * Returns: The maximum amount of time that can be queued.
+ *
+ * Since: 1.20
+ */
+GstClockTime
+gst_app_src_get_max_time (GstAppSrc * appsrc)
+{
+  GstClockTime result;
+  GstAppSrcPrivate *priv;
+
+  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
+
+  priv = appsrc->priv;
+
+  g_mutex_lock (&priv->mutex);
+  result = priv->max_time;
+  GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (result));
+  g_mutex_unlock (&priv->mutex);
+
+  return result;
+}
+
+/**
+ * gst_app_src_get_current_level_time:
+ * @appsrc: a #GstAppSrc
+ *
+ * Get the amount of currently queued time inside @appsrc.
+ *
+ * Returns: The amount of currently queued time.
+ *
+ * Since: 1.20
+ */
+GstClockTime
+gst_app_src_get_current_level_time (GstAppSrc * appsrc)
+{
+  gint64 queued;
+  GstAppSrcPrivate *priv;
+
+  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
+
+  priv = appsrc->priv;
+
+  GST_OBJECT_LOCK (appsrc);
+  queued = priv->queued_time;
+  GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (queued));
+  GST_OBJECT_UNLOCK (appsrc);
+
+  return queued;
+}
+
 static void
 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
     gboolean do_max, guint64 max)
@@ -1925,10 +2390,17 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
     if (priv->is_eos)
       goto eos;
 
-    if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
+    if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
+        (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
+        (priv->max_time && priv->queued_time >= priv->max_time)) {
       GST_DEBUG_OBJECT (appsrc,
-          "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
-          priv->queued_bytes, priv->max_bytes);
+          "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
+          G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
+          " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
+          GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
+          priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
+          priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
+          GST_TIME_ARGS (priv->max_time));
 
       if (first) {
         Callbacks *callbacks = NULL;
@@ -1983,15 +2455,16 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
     if (!steal_ref)
       gst_buffer_list_ref (buflist);
     gst_queue_array_push_tail (priv->queue, buflist);
-    priv->queued_bytes += gst_buffer_list_calculate_size (buflist);
   } else {
     GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
     if (!steal_ref)
       gst_buffer_ref (buffer);
     gst_queue_array_push_tail (priv->queue, buffer);
-    priv->queued_bytes += gst_buffer_get_size (buffer);
   }
 
+  gst_app_src_update_queued_push (appsrc,
+      buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));
+
   if ((priv->wait_status & STREAM_WAITING))
     g_cond_broadcast (&priv->cond);
 
index 82f8c2a4e01b418a1d3f7f2d343c711bfe61b387..b4d0041b09606c5f86834afc09caedd375dfbe89 100644 (file)
@@ -154,6 +154,24 @@ guint64          gst_app_src_get_max_bytes           (GstAppSrc *appsrc);
 GST_APP_API
 guint64          gst_app_src_get_current_level_bytes (GstAppSrc *appsrc);
 
+GST_APP_API
+void             gst_app_src_set_max_buffers           (GstAppSrc *appsrc, guint64 max);
+
+GST_APP_API
+guint64          gst_app_src_get_max_buffers           (GstAppSrc *appsrc);
+
+GST_APP_API
+guint64          gst_app_src_get_current_level_buffers (GstAppSrc *appsrc);
+
+GST_APP_API
+void             gst_app_src_set_max_time            (GstAppSrc *appsrc, GstClockTime max);
+
+GST_APP_API
+GstClockTime     gst_app_src_get_max_time            (GstAppSrc *appsrc);
+
+GST_APP_API
+GstClockTime     gst_app_src_get_current_level_time  (GstAppSrc *appsrc);
+
 GST_APP_API
 void             gst_app_src_set_latency             (GstAppSrc *appsrc, guint64 min, guint64 max);