appsrc: Add test for testing the max-* and leaky-type properties
authorSebastian Dröge <sebastian@centricular.com>
Mon, 3 May 2021 14:10:20 +0000 (17:10 +0300)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 5 May 2021 15:13:33 +0000 (15:13 +0000)
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1133>

tests/check/elements/appsrc.c

index b385a75..ae25429 100644 (file)
@@ -22,7 +22,7 @@
 #include "config.h"
 #endif
 
-#include <gst/check/gstcheck.h>
+#include <gst/check/check.h>
 #include <gst/app/gstappsrc.h>
 #include <gst/app/gstappsink.h>
 
@@ -1061,6 +1061,313 @@ GST_START_TEST (test_appsrc_custom_segment_twice)
 
 GST_END_TEST;
 
+static GstPadProbeReturn
+block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  return GST_PAD_PROBE_OK;
+}
+
+GST_START_TEST (test_appsrc_limits)
+{
+  GstHarness *h;
+  GstPad *srcpad;
+  GstBuffer *buffer;
+  gulong probe_id;
+  guint64 current_level;
+
+  /* Test if the bytes limit works correctly with both leaky types */
+  h = gst_harness_new ("appsrc");
+  g_object_set (h->element,
+      "format", GST_FORMAT_TIME,
+      "max-bytes", G_GUINT64_CONSTANT (200),
+      "max-time", G_GUINT64_CONSTANT (0),
+      "max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ ,
+      NULL);
+  gst_harness_play (h);
+  srcpad = gst_element_get_static_pad (h->element, "src");
+
+  /* Pad probe to ensure that the source pad task is blocked and we can
+   * deterministically test the behaviour of the appsrc queue */
+  probe_id =
+      gst_pad_add_probe (srcpad,
+      GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
+      GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* wait until the appsrc is blocked downstream */
+  while (!gst_pad_is_blocking (srcpad))
+    g_thread_yield ();
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* The first buffer is not queued anymore but inside the pad probe */
+  g_object_get (h->element, "current-level-bytes", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 200);
+  g_object_get (h->element, "current-level-buffers", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2);
+  g_object_get (h->element, "current-level-time", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* The new buffer was dropped now, otherwise we would have 2 seconds queued */
+  g_object_get (h->element, "current-level-bytes", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 200);
+  g_object_get (h->element, "current-level-buffers", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2);
+  g_object_get (h->element, "current-level-time", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
+
+  g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
+  g_object_get (h->element, "current-level-bytes", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 200);
+  g_object_get (h->element, "current-level-buffers", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2);
+  /* 3s because the last dequeued buffer had an end timestamp of 0s, the
+   * buffer with timestamp 1s was dropped and the newly queued buffer has a
+   * start timestamp of 4s */
+  g_object_get (h->element, "current-level-time", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);
+
+  /* Remove probe and check if we get all buffers we're supposed to get */
+  gst_pad_remove_probe (srcpad, probe_id);
+
+  buffer = gst_harness_pull (h);
+  fail_unless (buffer);
+  fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
+  gst_buffer_unref (buffer);
+
+  buffer = gst_harness_pull (h);
+  fail_unless (buffer);
+  fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
+  /* DISCONT because the buffer with 1s was dropped */
+  fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+  gst_buffer_unref (buffer);
+
+  buffer = gst_harness_pull (h);
+  fail_unless (buffer);
+  fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
+  /* DISCONT because the first buffer with 4s was dropped */
+  fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+  gst_buffer_unref (buffer);
+
+  gst_object_unref (srcpad);
+  gst_harness_teardown (h);
+
+  /* Test if the buffers limit works correctly with both leaky types */
+  h = gst_harness_new ("appsrc");
+  g_object_set (h->element,
+      "format", GST_FORMAT_TIME,
+      "max-bytes", G_GUINT64_CONSTANT (0),
+      "max-time", G_GUINT64_CONSTANT (0),
+      "max-buffers", G_GUINT64_CONSTANT (2), "leaky-type", 1 /* upstream */ ,
+      NULL);
+  gst_harness_play (h);
+  srcpad = gst_element_get_static_pad (h->element, "src");
+
+  /* Pad probe to ensure that the source pad task is blocked and we can
+   * deterministically test the behaviour of the appsrc queue */
+  probe_id =
+      gst_pad_add_probe (srcpad,
+      GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
+      GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* wait until the appsrc is blocked downstream */
+  while (!gst_pad_is_blocking (srcpad))
+    g_thread_yield ();
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* The first buffer is not queued anymore but inside the pad probe */
+  g_object_get (h->element, "current-level-bytes", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 200);
+  g_object_get (h->element, "current-level-buffers", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2);
+  g_object_get (h->element, "current-level-time", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* The new buffer was dropped now, otherwise we would have 2 seconds queued */
+  g_object_get (h->element, "current-level-bytes", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 200);
+  g_object_get (h->element, "current-level-buffers", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2);
+  g_object_get (h->element, "current-level-time", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
+
+  g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
+  g_object_get (h->element, "current-level-bytes", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 200);
+  g_object_get (h->element, "current-level-buffers", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2);
+  /* 3s because the last dequeued buffer had an end timestamp of 0s, the
+   * buffer with timestamp 1s was dropped and the newly queued buffer has a
+   * start timestamp of 4s */
+  g_object_get (h->element, "current-level-time", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);
+
+  /* Remove probe and check if we get all buffers we're supposed to get */
+  gst_pad_remove_probe (srcpad, probe_id);
+
+  buffer = gst_harness_pull (h);
+  fail_unless (buffer);
+  fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
+  gst_buffer_unref (buffer);
+
+  buffer = gst_harness_pull (h);
+  fail_unless (buffer);
+  fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
+  /* DISCONT because the buffer with 1s was dropped */
+  fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+  gst_buffer_unref (buffer);
+
+  buffer = gst_harness_pull (h);
+  fail_unless (buffer);
+  fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
+  /* DISCONT because the first buffer with 4s was dropped */
+  fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+  gst_buffer_unref (buffer);
+
+  gst_object_unref (srcpad);
+  gst_harness_teardown (h);
+
+  /* Test if the time limit works correctly with both leaky types */
+  h = gst_harness_new ("appsrc");
+  g_object_set (h->element,
+      "format", GST_FORMAT_TIME,
+      "max-bytes", G_GUINT64_CONSTANT (0),
+      "max-time", 2 * GST_SECOND,
+      "max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ ,
+      NULL);
+  gst_harness_play (h);
+  srcpad = gst_element_get_static_pad (h->element, "src");
+
+  /* Pad probe to ensure that the source pad task is blocked and we can
+   * deterministically test the behaviour of the appsrc queue */
+  probe_id =
+      gst_pad_add_probe (srcpad,
+      GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
+      GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
+  GST_BUFFER_DURATION (buffer) = GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* wait until the appsrc is blocked downstream */
+  while (!gst_pad_is_blocking (srcpad))
+    g_thread_yield ();
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
+  GST_BUFFER_DURATION (buffer) = GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
+  GST_BUFFER_DURATION (buffer) = GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* The first buffer is not queued anymore but inside the pad probe */
+  g_object_get (h->element, "current-level-bytes", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 200);
+  g_object_get (h->element, "current-level-buffers", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2);
+  g_object_get (h->element, "current-level-time", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2 * GST_SECOND);
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+  GST_BUFFER_DURATION (buffer) = GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* The new buffer was dropped now, otherwise we would have more than 2 seconds queued */
+  g_object_get (h->element, "current-level-bytes", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 200);
+  g_object_get (h->element, "current-level-buffers", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2);
+  g_object_get (h->element, "current-level-time", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2 * GST_SECOND);
+
+  g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);
+
+  buffer = gst_buffer_new_and_alloc (100);
+  GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+  GST_BUFFER_DURATION (buffer) = GST_SECOND;
+  gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+  /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
+  g_object_get (h->element, "current-level-bytes", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 200);
+  g_object_get (h->element, "current-level-buffers", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 2);
+  /* 3s because the last dequeued buffer had an end timestamp of 0s, the
+   * buffer with timestamp 1s was dropped and the newly queued buffer has a
+   * start timestamp of 4s */
+  g_object_get (h->element, "current-level-time", &current_level, NULL);
+  fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);
+
+  /* Remove probe and check if we get all buffers we're supposed to get */
+  gst_pad_remove_probe (srcpad, probe_id);
+
+  buffer = gst_harness_pull (h);
+  fail_unless (buffer);
+  fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
+  gst_buffer_unref (buffer);
+
+  buffer = gst_harness_pull (h);
+  fail_unless (buffer);
+  fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
+  /* DISCONT because the buffer with 1s was dropped */
+  fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+  gst_buffer_unref (buffer);
+
+  buffer = gst_harness_pull (h);
+  fail_unless (buffer);
+  fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
+  /* DISCONT because the first buffer with 4s was dropped */
+  fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+  gst_buffer_unref (buffer);
+
+  gst_object_unref (srcpad);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
 static Suite *
 appsrc_suite (void)
 {
@@ -1074,6 +1381,7 @@ appsrc_suite (void)
   tcase_add_test (tc_chain, test_appsrc_push_buffer_list);
   tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment);
   tcase_add_test (tc_chain, test_appsrc_custom_segment_twice);
+  tcase_add_test (tc_chain, test_appsrc_limits);
 
   if (RUNNING_ON_VALGRIND)
     tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);