appsrc: Implement a leaky property similar to the queue element
authorSebastian Dröge <sebastian@centricular.com>
Sun, 2 May 2021 17:46:00 +0000 (20:46 +0300)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 5 May 2021 15:13:33 +0000 (15:13 +0000)
This allows dropping the newest or oldest buffer when the internal queue
is full instead of blocking or continuing to grow.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1133>

docs/plugins/gst_plugins_cache.json
gst-libs/gst/app/gstappsrc.c
gst-libs/gst/app/gstappsrc.h

index 035331058125ffca4f6d5ad5a721ff83fc378c8b..a3f7def4c2dd57d909ad6c31406719e63600ce43 100644 (file)
                         "type": "gboolean",
                         "writable": true
                     },
+                    "leaky-type": {
+                        "blurb": "Whether to drop buffers once the internal queue is full",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "none (0)",
+                        "mutable": "ready",
+                        "readable": true,
+                        "type": "GstAppLeakyType",
+                        "writable": true
+                    },
                     "max-buffers": {
                         "blurb": "The maximum number of buffers to queue internally (0 = unlimited)",
                         "conditionally-available": false,
index 73d92211ba87607bbea2b51516d306bc75fff519..4c9467602c493c08065fafd1e955113250bce38d 100644 (file)
@@ -155,6 +155,13 @@ struct _GstAppSrcPrivate
    * the next buffer of buffer list */
   gboolean pending_custom_segment;
 
+  /* the next buffer that will be queued needs a discont flag
+   * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
+  gboolean need_discont_upstream;
+  /* the next buffer that will be dequeued needs a discont flag
+   * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
+  gboolean need_discont_downstream;
+
   gint64 size;
   GstClockTime duration;
   GstAppStreamType stream_type;
@@ -180,6 +187,8 @@ struct _GstAppSrcPrivate
   guint min_percent;
   gboolean handle_segment_change;
 
+  GstAppLeakyType leaky_type;
+
   Callbacks *callbacks;
 };
 
@@ -219,6 +228,7 @@ enum
 #define DEFAULT_PROP_CURRENT_LEVEL_TIME    0
 #define DEFAULT_PROP_DURATION      GST_CLOCK_TIME_NONE
 #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
+#define DEFAULT_PROP_LEAKY_TYPE    GST_APP_LEAKY_TYPE_NONE
 
 enum
 {
@@ -241,6 +251,7 @@ enum
   PROP_CURRENT_LEVEL_TIME,
   PROP_DURATION,
   PROP_HANDLE_SEGMENT_CHANGE,
+  PROP_LEAKY_TYPE,
   PROP_LAST
 };
 
@@ -541,6 +552,24 @@ gst_app_src_class_init (GstAppSrcClass * klass)
           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
           G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstAppSrc:leaky-type:
+   *
+   * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
+   * will drop any buffers that are pushed into it once its internal queue is
+   * full. The selected type defines whether to drop the oldest or new
+   * buffers.
+   *
+   * Since: 1.20
+   */
+  g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
+      g_param_spec_enum ("leaky-type", "Leaky Type",
+          "Whether to drop buffers once the internal queue is full",
+          GST_TYPE_APP_LEAKY_TYPE,
+          DEFAULT_PROP_LEAKY_TYPE,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
+          G_PARAM_STATIC_STRINGS));
+
   /**
    * GstAppSrc::need-data:
    * @appsrc: the appsrc element that emitted the signal
@@ -719,6 +748,7 @@ gst_app_src_init (GstAppSrc * appsrc)
   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
   priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
+  priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
 
   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
 }
@@ -750,6 +780,8 @@ gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
   priv->queued_time = 0;
   priv->last_in_running_time = GST_CLOCK_TIME_NONE;
   priv->last_out_running_time = GST_CLOCK_TIME_NONE;
+  priv->need_discont_upstream = FALSE;
+  priv->need_discont_downstream = FALSE;
 }
 
 static void
@@ -878,6 +910,9 @@ gst_app_src_set_property (GObject * object, guint prop_id,
     case PROP_HANDLE_SEGMENT_CHANGE:
       priv->handle_segment_change = g_value_get_boolean (value);
       break;
+    case PROP_LEAKY_TYPE:
+      priv->leaky_type = g_value_get_enum (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -957,6 +992,9 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_HANDLE_SEGMENT_CHANGE:
       g_value_set_boolean (value, priv->handle_segment_change);
       break;
+    case PROP_LEAKY_TYPE:
+      g_value_set_enum (value, priv->leaky_type);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1588,12 +1626,33 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
       }
 
       if (GST_IS_BUFFER (obj)) {
-        *buf = GST_BUFFER (obj);
+        GstBuffer *buffer = GST_BUFFER (obj);
+
+        /* Mark the buffer as DISCONT if we previously dropped a buffer
+         * instead of outputting it */
+        if (priv->need_discont_downstream) {
+          buffer = gst_buffer_make_writable (buffer);
+          GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+          priv->need_discont_downstream = FALSE;
+        }
+
+        *buf = buffer;
       } else if (GST_IS_BUFFER_LIST (obj)) {
         GstBufferList *buffer_list;
 
         buffer_list = GST_BUFFER_LIST (obj);
 
+        /* Mark the first buffer of the buffer list as DISCONT if we
+         * previously dropped a buffer instead of outputting it */
+        if (priv->need_discont_downstream) {
+          GstBuffer *buffer;
+
+          buffer_list = gst_buffer_list_make_writable (buffer_list);
+          buffer = gst_buffer_list_get_writable (buffer_list, 0);
+          GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+          priv->need_discont_downstream = FALSE;
+        }
+
         gst_base_src_submit_buffer_list (bsrc, buffer_list);
         *buf = NULL;
       } else if (GST_IS_EVENT (obj)) {
@@ -2223,6 +2282,45 @@ gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
   }
 }
 
+/**
+ * gst_app_src_set_leaky_type:
+ * @appsrc: a #GstAppSrc
+ * @leaky: the #GstAppLeakyType
+ *
+ * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
+ * will drop any buffers that are pushed into it once its internal queue is
+ * full. The selected type defines whether to drop the oldest or new
+ * buffers.
+ *
+ * Since: 1.20
+ */
+void
+gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
+{
+  g_return_if_fail (GST_IS_APP_SRC (appsrc));
+
+  appsrc->priv->leaky_type = leaky;
+}
+
+/**
+ * gst_app_src_get_leaky_type:
+ * @appsrc: a #GstAppSrc
+ *
+ * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
+ * for more details.
+ *
+ * Returns: The currently set #GstAppLeakyType.
+ *
+ * Since: 1.20
+ */
+GstAppLeakyType
+gst_app_src_get_leaky_type (GstAppSrc * appsrc)
+{
+  g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);
+
+  return appsrc->priv->leaky_type;
+}
+
 /**
  * gst_app_src_set_latency:
  * @appsrc: a #GstAppSrc
@@ -2402,6 +2500,43 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
           priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
           GST_TIME_ARGS (priv->max_time));
 
+      if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
+        priv->need_discont_upstream = TRUE;
+        goto dropped;
+      } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
+        guint i, length = gst_queue_array_get_length (priv->queue);
+        GstMiniObject *item = NULL;
+
+        /* Find the oldest buffer or buffer list and drop it, then update the
+         * limits. Dropping one is sufficient to go below the limits again.
+         */
+        for (i = 0; i < length; i++) {
+          item = gst_queue_array_peek_nth (priv->queue, i);
+          if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
+            gst_queue_array_drop_element (priv->queue, i);
+            break;
+          }
+          /* To not accidentally have an event after the loop */
+          item = NULL;
+        }
+
+        if (!item) {
+          GST_FIXME_OBJECT (appsrc,
+              "No buffer or buffer list queued but queue is full");
+          /* This shouldn't really happen but in this case we can't really do
+           * anything apart from accepting the buffer / bufferlist */
+          break;
+        }
+
+        GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
+
+        gst_app_src_update_queued_pop (appsrc, item, FALSE);
+        gst_mini_object_unref (item);
+
+        priv->need_discont_downstream = TRUE;
+        continue;
+      }
+
       if (first) {
         Callbacks *callbacks = NULL;
         gboolean emit;
@@ -2438,8 +2573,9 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
          * stops pushing buffers. */
         break;
       }
-    } else
+    } else {
       break;
+    }
   }
 
   if (priv->pending_custom_segment) {
@@ -2451,11 +2587,39 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
   }
 
   if (buflist != NULL) {
+    /* Mark the first buffer of the buffer list as DISCONT if we previously
+     * dropped a buffer instead of queueing it */
+    if (priv->need_discont_upstream) {
+      if (!steal_ref) {
+        buflist = gst_buffer_list_copy (buflist);
+        steal_ref = TRUE;
+      } else {
+        buflist = gst_buffer_list_make_writable (buflist);
+      }
+      buffer = gst_buffer_list_get_writable (buflist, 0);
+      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      priv->need_discont_upstream = FALSE;
+    }
+
     GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
+
     if (!steal_ref)
       gst_buffer_list_ref (buflist);
     gst_queue_array_push_tail (priv->queue, buflist);
   } else {
+    /* Mark the buffer as DISCONT if we previously dropped a buffer instead of
+     * queueing it */
+    if (priv->need_discont_upstream) {
+      if (!steal_ref) {
+        buffer = gst_buffer_copy (buffer);
+        steal_ref = TRUE;
+      } else {
+        buffer = gst_buffer_make_writable (buffer);
+      }
+      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      priv->need_discont_upstream = FALSE;
+    }
+
     GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
     if (!steal_ref)
       gst_buffer_ref (buffer);
@@ -2497,6 +2661,18 @@ eos:
     g_mutex_unlock (&priv->mutex);
     return GST_FLOW_EOS;
   }
+dropped:
+  {
+    GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
+    if (steal_ref) {
+      if (buflist)
+        gst_buffer_list_unref (buflist);
+      else
+        gst_buffer_unref (buffer);
+    }
+    g_mutex_unlock (&priv->mutex);
+    return GST_FLOW_EOS;
+  }
 }
 
 static GstFlowReturn
index b4d0041b09606c5f86834afc09caedd375dfbe89..16180fd1655e33eed36e53e69c1a5cd6fb7b1e04 100644 (file)
@@ -88,6 +88,23 @@ typedef enum
   GST_APP_STREAM_TYPE_RANDOM_ACCESS
 } GstAppStreamType;
 
+/**
+ * GstAppLeakyType:
+ * @GST_APP_LEAKY_TYPE_NONE: Not Leaky
+ * @GST_APP_LEAKY_TYPE_UPSTREAM: Leaky on upstream (new buffers)
+ * @GST_APP_LEAKY_TYPE_DOWNSTREAM: Leaky on downstream (old buffers)
+ *
+ * Buffer dropping scheme to avoid the element's internal queue to block when
+ * full.
+ *
+ * Since: 1.20
+ */
+typedef enum {
+  GST_APP_LEAKY_TYPE_NONE,
+  GST_APP_LEAKY_TYPE_UPSTREAM,
+  GST_APP_LEAKY_TYPE_DOWNSTREAM
+} GstAppLeakyType;
+
 struct _GstAppSrc
 {
   GstBaseSrc basesrc;
@@ -172,6 +189,12 @@ GstClockTime     gst_app_src_get_max_time            (GstAppSrc *appsrc);
 GST_APP_API
 GstClockTime     gst_app_src_get_current_level_time  (GstAppSrc *appsrc);
 
+GST_APP_API
+void             gst_app_src_set_leaky_type          (GstAppSrc *appsrc, GstAppLeakyType leaky);
+
+GST_APP_API
+GstAppLeakyType  gst_app_src_get_leaky_type          (GstAppSrc *appsrc);
+
 GST_APP_API
 void             gst_app_src_set_latency             (GstAppSrc *appsrc, guint64 min, guint64 max);