aggregator: refactor the test helper
authorStefan Sauer <ensonic@users.sf.net>
Sat, 14 Oct 2017 11:26:02 +0000 (13:26 +0200)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:27 +0000 (15:10 +0000)
Make the test helpers use a queue. This lets us also test sequences of events,
queries and data.

tests/check/libs/aggregator.c

index 692d867..24d7a95 100644 (file)
@@ -194,8 +194,7 @@ gst_test_aggregator_plugin_register (void)
 
 typedef struct
 {
-  GstEvent *event;
-  GstBuffer *buffer;
+  GQueue *queue;
   GstElement *aggregator;
   GstPad *sinkpad, *srcpad;
   GstFlowReturn expected_result;
@@ -254,48 +253,44 @@ start_flow (ChainData * chain_data)
 }
 
 static gpointer
-push_buffer (gpointer user_data)
-{
-  GstFlowReturn flow;
-  ChainData *chain_data = (ChainData *) user_data;
-
-  start_flow (chain_data);
-
-  GST_INFO_OBJECT (chain_data->sinkpad, "Pushing buffer %" GST_PTR_FORMAT,
-      chain_data->buffer);
-  flow = gst_pad_push (chain_data->srcpad, chain_data->buffer);
-  fail_unless (flow == chain_data->expected_result,
-      "got flow %s instead of %s on %s:%s", gst_flow_get_name (flow),
-      gst_flow_get_name (chain_data->expected_result),
-      GST_DEBUG_PAD_NAME (chain_data->sinkpad));
-  chain_data->buffer = NULL;
-
-  return NULL;
-}
-
-static gpointer
-push_event (gpointer user_data)
+push_data (gpointer user_data)
 {
   ChainData *chain_data = (ChainData *) user_data;
   GstTestAggregator *aggregator = (GstTestAggregator *) chain_data->aggregator;
-  GstEventType event_type = GST_EVENT_TYPE (chain_data->event);
+  GstPad *sinkpad = chain_data->sinkpad;
+  GstPad *srcpad = chain_data->srcpad;
+  gpointer data;
 
   start_flow (chain_data);
 
-  GST_INFO_OBJECT (chain_data->sinkpad, "Pushing event: %" GST_PTR_FORMAT,
-      chain_data->event);
-
-  switch (event_type) {
-    case GST_EVENT_GAP:
-      aggregator->gap_expected = TRUE;
-      break;
-    default:
-      break;
+  while ((data = g_queue_pop_head (chain_data->queue))) {
+    GST_DEBUG_OBJECT (sinkpad, "Pushing %" GST_PTR_FORMAT, data);
+
+    /* switch on the data type and push */
+    if (GST_IS_BUFFER (data)) {
+      GstFlowReturn flow = gst_pad_push (srcpad, GST_BUFFER_CAST (data));
+      fail_unless (flow == chain_data->expected_result,
+          "got flow %s instead of %s on %s:%s", gst_flow_get_name (flow),
+          gst_flow_get_name (chain_data->expected_result),
+          GST_DEBUG_PAD_NAME (sinkpad));
+    } else if (GST_IS_EVENT (data)) {
+      switch (GST_EVENT_TYPE (data)) {
+        case GST_EVENT_GAP:
+          aggregator->gap_expected = TRUE;
+          break;
+        default:
+          break;
+      }
+      fail_unless (gst_pad_push_event (srcpad, GST_EVENT_CAST (data)));
+    } else if (GST_IS_QUERY (data)) {
+      /* we don't care whether the query actualy got handled */
+      gst_pad_peer_query (srcpad, GST_QUERY_CAST (data));
+      gst_query_unref (GST_QUERY_CAST (data));
+    } else {
+      GST_WARNING_OBJECT (sinkpad, "bad queue entry: %" GST_PTR_FORMAT, data);
+    }
   }
-
-  fail_unless (gst_pad_push_event (chain_data->srcpad,
-          chain_data->event) == TRUE);
-  chain_data->event = NULL;
+  GST_DEBUG_OBJECT (sinkpad, "All data from queue sent");
 
   return NULL;
 }
@@ -352,10 +347,12 @@ _downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, TestData * test)
  * clear with _chain_data_clear after.
  */
 static void
-_chain_data_init (ChainData * data, GstElement * agg)
+_chain_data_init (ChainData * data, GstElement * agg, ...)
 {
   static gint num_src_pads = 0;
   gchar *pad_name = g_strdup_printf ("src%d", num_src_pads);
+  va_list var_args;
+  gpointer d;
 
   num_src_pads += 1;
 
@@ -363,21 +360,46 @@ _chain_data_init (ChainData * data, GstElement * agg)
   g_free (pad_name);
   gst_pad_set_active (data->srcpad, TRUE);
   data->aggregator = agg;
-  data->buffer = gst_buffer_new ();
   data->sinkpad = gst_element_get_request_pad (agg, "sink_%u");
   fail_unless (GST_IS_PAD (data->sinkpad));
   fail_unless (gst_pad_link (data->srcpad, data->sinkpad) == GST_PAD_LINK_OK);
+
+  /* add data items */
+  data->queue = g_queue_new ();
+  va_start (var_args, agg);
+  while (TRUE) {
+    if (!(d = va_arg (var_args, gpointer)))
+      break;
+    g_queue_push_tail (data->queue, d);
+    GST_DEBUG_OBJECT (data->sinkpad, "Adding to queue: %" GST_PTR_FORMAT, d);
+  }
+  va_end (var_args);
 }
 
 static void
-_chain_data_clear (ChainData * data)
+_chain_data_clear (ChainData * chain_data)
 {
-  gst_buffer_replace (&data->buffer, NULL);
-  gst_event_replace (&data->event, NULL);
-  if (data->srcpad)
-    gst_object_unref (data->srcpad);
-  if (data->sinkpad)
-    gst_object_unref (data->sinkpad);
+  gpointer data;
+
+  while ((data = g_queue_pop_head (chain_data->queue))) {
+    /* switch on the data type and free */
+    if (GST_IS_BUFFER (data)) {
+      gst_buffer_unref (GST_BUFFER_CAST (data));
+    } else if (GST_IS_EVENT (data)) {
+      gst_event_unref (GST_EVENT_CAST (data));
+    } else if (GST_IS_QUERY (data)) {
+      gst_query_unref (GST_QUERY_CAST (data));
+    } else {
+      GST_WARNING_OBJECT (chain_data->sinkpad, "bad queue entry: %"
+          GST_PTR_FORMAT, data);
+    }
+  }
+  g_queue_free (chain_data->queue);
+
+  if (chain_data->srcpad)
+    gst_object_unref (chain_data->srcpad);
+  if (chain_data->sinkpad)
+    gst_object_unref (chain_data->sinkpad);
 }
 
 static GstFlowReturn
@@ -441,11 +463,11 @@ GST_START_TEST (test_aggregate)
   TestData test = { 0, };
 
   _test_data_init (&test, FALSE);
-  _chain_data_init (&data1, test.aggregator);
-  _chain_data_init (&data2, test.aggregator);
+  _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
+  _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
 
-  thread1 = g_thread_try_new ("gst-check", push_buffer, &data1, NULL);
-  thread2 = g_thread_try_new ("gst-check", push_buffer, &data2, NULL);
+  thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
+  thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
 
   g_main_loop_run (test.ml);
   g_source_remove (test.timeout_id);
@@ -470,13 +492,11 @@ GST_START_TEST (test_aggregate_eos)
   TestData test = { 0, };
 
   _test_data_init (&test, FALSE);
-  _chain_data_init (&data1, test.aggregator);
-  _chain_data_init (&data2, test.aggregator);
+  _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
+  _chain_data_init (&data2, test.aggregator, gst_event_new_eos (), NULL);
 
-  data2.event = gst_event_new_eos ();
-
-  thread1 = g_thread_try_new ("gst-check", push_buffer, &data1, NULL);
-  thread2 = g_thread_try_new ("gst-check", push_event, &data2, NULL);
+  thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
+  thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
 
   g_main_loop_run (test.ml);
   g_source_remove (test.timeout_id);
@@ -500,11 +520,10 @@ GST_START_TEST (test_aggregate_gap)
   TestData test = { 0, };
 
   _test_data_init (&test, FALSE);
-  _chain_data_init (&data, test.aggregator);
-
-  data.event = gst_event_new_gap (TEST_GAP_PTS, TEST_GAP_DURATION);
+  _chain_data_init (&data, test.aggregator,
+      gst_event_new_gap (TEST_GAP_PTS, TEST_GAP_DURATION), NULL);
 
-  thread = g_thread_try_new ("gst-check", push_event, &data, NULL);
+  thread = g_thread_try_new ("gst-check", push_data, &data, NULL);
 
   g_main_loop_run (test.ml);
   g_source_remove (test.timeout_id);
@@ -708,6 +727,7 @@ GST_START_TEST (test_flushing_seek)
   ChainData data1 = { 0, };
   ChainData data2 = { 0, };
   TestData test = { 0, };
+  GstBuffer *buf;
 
   _test_data_init (&test, TRUE);
 
@@ -716,9 +736,11 @@ GST_START_TEST (test_flushing_seek)
    * buffers queued in collectpads should get flushed. Only one FLUSH_START and
    * one FLUSH_STOP should be forwarded downstream.
    */
-  _chain_data_init (&data1, test.aggregator);
-  _chain_data_init (&data2, test.aggregator);
-  GST_BUFFER_TIMESTAMP (data2.buffer) = 0;
+  _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
+
+  buf = gst_buffer_new ();
+  GST_BUFFER_TIMESTAMP (buf) = 0;
+  _chain_data_init (&data2, test.aggregator, buf, NULL);
 
   gst_segment_init (&GST_AGGREGATOR (test.aggregator)->segment,
       GST_FORMAT_TIME);
@@ -740,7 +762,7 @@ GST_START_TEST (test_flushing_seek)
 
   /* expect this buffer to be flushed */
   data2.expected_result = GST_FLOW_FLUSHING;
-  thread2 = g_thread_try_new ("gst-check", push_buffer, &data2, NULL);
+  thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
 
   fail_unless (gst_pad_push_event (data1.srcpad, gst_event_new_flush_start ()));
   fail_unless_equals_int (test.flush_start_events, 1);
@@ -758,7 +780,7 @@ GST_START_TEST (test_flushing_seek)
 
   /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
    * that flushing completes once all the pads have been flushed */
-  thread1 = g_thread_try_new ("gst-check", push_buffer, &data1, NULL);
+  thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
 
   /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
    * sent downstream */
@@ -771,8 +793,8 @@ GST_START_TEST (test_flushing_seek)
   gst_pad_add_probe (test.srcpad, GST_PAD_PROBE_TYPE_BUFFER,
       (GstPadProbeCallback) _aggregated_cb, test.ml, NULL);
 
-  data2.event = gst_event_new_eos ();
-  thread2 = g_thread_try_new ("gst-check", push_event, &data2, NULL);
+  g_queue_push_tail (data2.queue, gst_event_new_eos ());
+  thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
 
   g_main_loop_run (test.ml);
   g_source_remove (test.timeout_id);