tests: multiqueue: test to check queue overrun with pts=none
authorThiago Santos <ts.santos@sisa.samsung.com>
Thu, 8 May 2014 20:33:37 +0000 (17:33 -0300)
committerThiago Santos <ts.santos@sisa.samsung.com>
Thu, 8 May 2014 20:56:26 +0000 (17:56 -0300)
Checks if buffers with pts=none can break the queue time size limit
and allow more buffers than expected

tests/check/elements/multiqueue.c

index ad2624e..4ecd07c 100644 (file)
@@ -816,6 +816,137 @@ GST_START_TEST (test_limit_changes)
 
 GST_END_TEST;
 
+static GMutex block_mutex;
+static GCond block_cond;
+static gint unblock_count;
+static gboolean expect_overrun;
+
+static GstFlowReturn
+pad_chain_block (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+  g_mutex_lock (&block_mutex);
+  while (unblock_count == 0) {
+    g_cond_wait (&block_cond, &block_mutex);
+  }
+  if (unblock_count > 0) {
+    unblock_count--;
+  }
+  g_mutex_unlock (&block_mutex);
+
+  gst_buffer_unref (buffer);
+  return GST_FLOW_OK;
+}
+
+static gboolean
+pad_event_always_ok (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  gst_event_unref (event);
+  return TRUE;
+}
+
+static void
+mq_overrun (GstElement * mq, gpointer udata)
+{
+  fail_unless (expect_overrun);
+
+  /* unblock always so we don't get stuck */
+  g_mutex_lock (&block_mutex);
+  unblock_count = 2;            /* let the PTS=0 and PTS=none go */
+  g_cond_signal (&block_cond);
+  g_mutex_unlock (&block_mutex);
+}
+
+GST_START_TEST (test_buffering_with_none_pts)
+{
+  /*
+   * This test creates a multiqueue where source pushing blocks so we can check
+   * how its buffering level is reacting to GST_CLOCK_TIME_NONE buffers
+   * mixed with properly timestamped buffers.
+   *
+   * Sequence of pushing:
+   * pts=0
+   * pts=none
+   * pts=1 (it gets full now)
+   * pts=none (overrun expected)
+   */
+  GstElement *mq;
+  GstPad *inputpad;
+  GstPad *outputpad;
+  GstPad *mq_sinkpad;
+  GstPad *mq_srcpad;
+  GstSegment segment;
+  GstBuffer *buffer;
+
+  g_mutex_init (&block_mutex);
+  g_cond_init (&block_cond);
+  unblock_count = 0;
+  expect_overrun = FALSE;
+
+  mq = gst_element_factory_make ("multiqueue", NULL);
+  fail_unless (mq != NULL);
+
+  g_object_set (mq,
+      "max-size-bytes", (guint) 0,
+      "max-size-buffers", (guint) 0,
+      "max-size-time", (guint64) GST_SECOND, NULL);
+  g_signal_connect (mq, "overrun", (GCallback) mq_overrun, NULL);
+
+  gst_segment_init (&segment, GST_FORMAT_TIME);
+
+  inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
+  outputpad = gst_pad_new ("dummysink", GST_PAD_SINK);
+  gst_pad_set_chain_function (outputpad, pad_chain_block);
+  gst_pad_set_event_function (outputpad, pad_event_always_ok);
+  mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
+  mq_srcpad = gst_element_get_static_pad (mq, "src_0");
+  fail_unless (mq_sinkpad != NULL);
+  fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
+  fail_unless (gst_pad_link (mq_srcpad, outputpad) == GST_PAD_LINK_OK);
+
+  gst_pad_set_active (inputpad, TRUE);
+  gst_pad_set_active (outputpad, TRUE);
+  gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
+  gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
+
+  gst_element_set_state (mq, GST_STATE_PAUSED);
+
+  /* push a buffer with PTS = 0 */
+  buffer = gst_buffer_new ();
+  GST_BUFFER_PTS (buffer) = 0;
+  fail_unless (gst_pad_push (inputpad, buffer) == GST_FLOW_OK);
+
+  /* push a buffer with PTS = NONE */
+  buffer = gst_buffer_new ();
+  GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
+  fail_unless (gst_pad_push (inputpad, buffer) == GST_FLOW_OK);
+
+  /* push a buffer with PTS = 1s, so we have 1s of data in multiqueue, we are
+   * full */
+  buffer = gst_buffer_new ();
+  GST_BUFFER_PTS (buffer) = GST_SECOND;
+  fail_unless (gst_pad_push (inputpad, buffer) == GST_FLOW_OK);
+
+  /* push a buffer with PTS = NONE, the queue is full so it should overrun */
+  expect_overrun = TRUE;
+  buffer = gst_buffer_new ();
+  GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
+  fail_unless (gst_pad_push (inputpad, buffer) == GST_FLOW_OK);
+
+  g_mutex_lock (&block_mutex);
+  unblock_count = -1;
+  g_cond_signal (&block_cond);
+  g_mutex_unlock (&block_mutex);
+
+  gst_element_set_state (mq, GST_STATE_NULL);
+  gst_object_unref (inputpad);
+  gst_object_unref (outputpad);
+  gst_object_unref (mq);
+  g_mutex_clear (&block_mutex);
+  g_cond_clear (&block_cond);
+}
+
+GST_END_TEST;
+
 static Suite *
 multiqueue_suite (void)
 {
@@ -837,6 +968,8 @@ multiqueue_suite (void)
   tcase_add_test (tc_chain, test_sparse_stream);
   tcase_add_test (tc_chain, test_limit_changes);
 
+  tcase_add_test (tc_chain, test_buffering_with_none_pts);
+
   return s;
 }