#include "config.h"
#endif
-#include <gst/check/gstcheck.h>
+#include <gst/check/check.h>
#include <gst/app/gstappsrc.h>
#include <gst/app/gstappsink.h>
GST_END_TEST;
+static GstPadProbeReturn
+block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ return GST_PAD_PROBE_OK;
+}
+
+GST_START_TEST (test_appsrc_limits)
+{
+ GstHarness *h;
+ GstPad *srcpad;
+ GstBuffer *buffer;
+ gulong probe_id;
+ guint64 current_level;
+
+ /* Test if the bytes limit works correctly with both leaky types */
+ h = gst_harness_new ("appsrc");
+ g_object_set (h->element,
+ "format", GST_FORMAT_TIME,
+ "max-bytes", G_GUINT64_CONSTANT (200),
+ "max-time", G_GUINT64_CONSTANT (0),
+ "max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ ,
+ NULL);
+ gst_harness_play (h);
+ srcpad = gst_element_get_static_pad (h->element, "src");
+
+ /* Pad probe to ensure that the source pad task is blocked and we can
+ * deterministically test the behaviour of the appsrc queue */
+ probe_id =
+ gst_pad_add_probe (srcpad,
+ GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
+ GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* wait until the appsrc is blocked downstream */
+ while (!gst_pad_is_blocking (srcpad))
+ g_thread_yield ();
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* The first buffer is not queued anymore but inside the pad probe */
+ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 200);
+ g_object_get (h->element, "current-level-buffers", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2);
+ g_object_get (h->element, "current-level-time", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* The new buffer was dropped now, otherwise we would have 2 seconds queued */
+ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 200);
+ g_object_get (h->element, "current-level-buffers", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2);
+ g_object_get (h->element, "current-level-time", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
+
+ g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
+ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 200);
+ g_object_get (h->element, "current-level-buffers", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2);
+ /* 3s because the last dequeued buffer had an end timestamp of 0s, the
+ * buffer with timestamp 1s was dropped and the newly queued buffer has a
+ * start timestamp of 4s */
+ g_object_get (h->element, "current-level-time", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);
+
+ /* Remove probe and check if we get all buffers we're supposed to get */
+ gst_pad_remove_probe (srcpad, probe_id);
+
+ buffer = gst_harness_pull (h);
+ fail_unless (buffer);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
+ gst_buffer_unref (buffer);
+
+ buffer = gst_harness_pull (h);
+ fail_unless (buffer);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
+ /* DISCONT because the buffer with 1s was dropped */
+ fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+ gst_buffer_unref (buffer);
+
+ buffer = gst_harness_pull (h);
+ fail_unless (buffer);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
+ /* DISCONT because the first buffer with 4s was dropped */
+ fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+ gst_buffer_unref (buffer);
+
+ gst_object_unref (srcpad);
+ gst_harness_teardown (h);
+
+ /* Test if the buffers limit works correctly with both leaky types */
+ h = gst_harness_new ("appsrc");
+ g_object_set (h->element,
+ "format", GST_FORMAT_TIME,
+ "max-bytes", G_GUINT64_CONSTANT (0),
+ "max-time", G_GUINT64_CONSTANT (0),
+ "max-buffers", G_GUINT64_CONSTANT (2), "leaky-type", 1 /* upstream */ ,
+ NULL);
+ gst_harness_play (h);
+ srcpad = gst_element_get_static_pad (h->element, "src");
+
+ /* Pad probe to ensure that the source pad task is blocked and we can
+ * deterministically test the behaviour of the appsrc queue */
+ probe_id =
+ gst_pad_add_probe (srcpad,
+ GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
+ GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* wait until the appsrc is blocked downstream */
+ while (!gst_pad_is_blocking (srcpad))
+ g_thread_yield ();
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* The first buffer is not queued anymore but inside the pad probe */
+ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 200);
+ g_object_get (h->element, "current-level-buffers", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2);
+ g_object_get (h->element, "current-level-time", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* The new buffer was dropped now, otherwise we would have 2 seconds queued */
+ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 200);
+ g_object_get (h->element, "current-level-buffers", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2);
+ g_object_get (h->element, "current-level-time", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
+
+ g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
+ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 200);
+ g_object_get (h->element, "current-level-buffers", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2);
+ /* 3s because the last dequeued buffer had an end timestamp of 0s, the
+ * buffer with timestamp 1s was dropped and the newly queued buffer has a
+ * start timestamp of 4s */
+ g_object_get (h->element, "current-level-time", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);
+
+ /* Remove probe and check if we get all buffers we're supposed to get */
+ gst_pad_remove_probe (srcpad, probe_id);
+
+ buffer = gst_harness_pull (h);
+ fail_unless (buffer);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
+ gst_buffer_unref (buffer);
+
+ buffer = gst_harness_pull (h);
+ fail_unless (buffer);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
+ /* DISCONT because the buffer with 1s was dropped */
+ fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+ gst_buffer_unref (buffer);
+
+ buffer = gst_harness_pull (h);
+ fail_unless (buffer);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
+ /* DISCONT because the first buffer with 4s was dropped */
+ fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+ gst_buffer_unref (buffer);
+
+ gst_object_unref (srcpad);
+ gst_harness_teardown (h);
+
+ /* Test if the time limit works correctly with both leaky types */
+ h = gst_harness_new ("appsrc");
+ g_object_set (h->element,
+ "format", GST_FORMAT_TIME,
+ "max-bytes", G_GUINT64_CONSTANT (0),
+ "max-time", 2 * GST_SECOND,
+ "max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ ,
+ NULL);
+ gst_harness_play (h);
+ srcpad = gst_element_get_static_pad (h->element, "src");
+
+ /* Pad probe to ensure that the source pad task is blocked and we can
+ * deterministically test the behaviour of the appsrc queue */
+ probe_id =
+ gst_pad_add_probe (srcpad,
+ GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
+ GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* wait until the appsrc is blocked downstream */
+ while (!gst_pad_is_blocking (srcpad))
+ g_thread_yield ();
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* The first buffer is not queued anymore but inside the pad probe */
+ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 200);
+ g_object_get (h->element, "current-level-buffers", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2);
+ g_object_get (h->element, "current-level-time", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2 * GST_SECOND);
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* The new buffer was dropped now, otherwise we would have more than 2 seconds queued */
+ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 200);
+ g_object_get (h->element, "current-level-buffers", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2);
+ g_object_get (h->element, "current-level-time", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2 * GST_SECOND);
+
+ g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);
+
+ buffer = gst_buffer_new_and_alloc (100);
+ GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+ gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
+
+ /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
+ g_object_get (h->element, "current-level-bytes", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 200);
+ g_object_get (h->element, "current-level-buffers", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 2);
+ /* 3s because the last dequeued buffer had an end timestamp of 0s, the
+ * buffer with timestamp 1s was dropped and the newly queued buffer has a
+ * start timestamp of 4s */
+ g_object_get (h->element, "current-level-time", ¤t_level, NULL);
+ fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);
+
+ /* Remove probe and check if we get all buffers we're supposed to get */
+ gst_pad_remove_probe (srcpad, probe_id);
+
+ buffer = gst_harness_pull (h);
+ fail_unless (buffer);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
+ gst_buffer_unref (buffer);
+
+ buffer = gst_harness_pull (h);
+ fail_unless (buffer);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
+ /* DISCONT because the buffer with 1s was dropped */
+ fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+ gst_buffer_unref (buffer);
+
+ buffer = gst_harness_pull (h);
+ fail_unless (buffer);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
+ /* DISCONT because the first buffer with 4s was dropped */
+ fail_unless (GST_BUFFER_IS_DISCONT (buffer));
+ gst_buffer_unref (buffer);
+
+ gst_object_unref (srcpad);
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
static Suite *
appsrc_suite (void)
{
tcase_add_test (tc_chain, test_appsrc_push_buffer_list);
tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment);
tcase_add_test (tc_chain, test_appsrc_custom_segment_twice);
+ tcase_add_test (tc_chain, test_appsrc_limits);
if (RUNNING_ON_VALGRIND)
tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);