tests: collectpads: add flushing seek tests
authorAlessandro Decina <alessandro.d@gmail.com>
Mon, 16 Sep 2013 06:35:37 +0000 (08:35 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Mon, 11 Nov 2013 15:50:42 +0000 (16:50 +0100)
https://bugzilla.gnome.org/show_bug.cgi?id=708416

tests/check/libs/collectpads.c

index 6de82d8..689353e 100644 (file)
@@ -291,6 +291,7 @@ typedef struct
   GstPad *pad;
   GstBuffer *buffer;
   GstEvent *event;
+  GstFlowReturn expected_result;
 } TestData;
 
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
@@ -305,10 +306,13 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
 
 static GstCollectPads *collect;
 static gboolean collected;
-static GstPad *srcpad1, *srcpad2;
+static GstPad *agg_srcpad, *srcpad1, *srcpad2;
 static GstPad *sinkpad1, *sinkpad2;
 static TestData *data1, *data2;
 static GstBuffer *outbuf1, *outbuf2;
+static GstElement *agg;
+gboolean fail_seek;
+gint flush_start_events, flush_stop_events;
 
 static GMutex lock;
 static GCond cond;
@@ -362,7 +366,7 @@ push_buffer (gpointer user_data)
   gst_pad_push_event (test_data->pad, gst_event_new_segment (&segment));
 
   flow = gst_pad_push (test_data->pad, test_data->buffer);
-  fail_unless (flow == GST_FLOW_OK, "got flow %s instead of OK",
+  fail_unless (flow == test_data->expected_result, "got flow %s instead of OK",
       gst_flow_get_name (flow));
 
   return NULL;
@@ -779,16 +783,237 @@ GST_START_TEST (test_branched_pipeline)
 
 GST_END_TEST;
 
+static GstPadProbeReturn
+downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
+    if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
+        GST_EVENT_FLUSH_START)
+      g_atomic_int_inc (&flush_start_events);
+    else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
+        GST_EVENT_FLUSH_STOP)
+      g_atomic_int_inc (&flush_stop_events);
+  } else if (info->type & GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM) {
+    g_mutex_lock (&lock);
+    collected = TRUE;
+    g_cond_signal (&cond);
+    g_mutex_unlock (&lock);
+  }
+
+  return GST_PAD_PROBE_DROP;
+}
+
+static gboolean
+src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  gboolean ret = TRUE;
+  if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
+    if (g_atomic_int_compare_and_exchange (&fail_seek, TRUE, FALSE) == TRUE) {
+      ret = FALSE;
+    }
+  }
+
+  gst_event_unref (event);
+  return ret;
+}
+
+static gboolean
+agg_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  return gst_collect_pads_src_event_default (GST_AGGREGATOR (parent)->collect,
+      pad, event);
+}
+
+static GstPad *
+setup_src_pad (GstElement * element,
+    GstStaticPadTemplate * tmpl, const char *name)
+{
+  GstPad *srcpad, *sinkpad;
+
+  srcpad = gst_pad_new_from_static_template (tmpl, "src");
+  sinkpad = gst_element_get_request_pad (element, name);
+  fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
+      "Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
+  gst_pad_set_event_function (srcpad, src_event);
+  gst_pad_set_active (srcpad, TRUE);
+
+  return srcpad;
+}
+
+static void
+flush_setup (void)
+{
+  agg = gst_check_setup_element ("aggregator");
+  agg_srcpad = gst_element_get_static_pad (agg, "src");
+  srcpad1 = setup_src_pad (agg, &srctemplate, "sink_0");
+  srcpad2 = setup_src_pad (agg, &srctemplate, "sink_1");
+  gst_pad_add_probe (agg_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
+      GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM |
+      GST_PAD_PROBE_TYPE_EVENT_FLUSH, downstream_probe_cb, NULL, NULL);
+  gst_pad_set_event_function (agg_srcpad, agg_src_event);
+  data1 = g_new0 (TestData, 1);
+  data2 = g_new0 (TestData, 1);
+  g_atomic_int_set (&flush_start_events, 0);
+  g_atomic_int_set (&flush_stop_events, 0);
+  gst_element_set_state (agg, GST_STATE_PLAYING);
+}
+
+static void
+flush_teardown (void)
+{
+  gst_element_set_state (agg, GST_STATE_NULL);
+  gst_object_unref (agg);
+  gst_object_unref (agg_srcpad);
+  gst_object_unref (srcpad1);
+  gst_object_unref (srcpad2);
+  g_free (data1);
+  g_free (data2);
+}
+
+GST_START_TEST (test_flushing_seek_failure)
+{
+  GstBuffer *buf1, *buf2;
+  GThread *thread1, *thread2;
+  GstEvent *event;
+
+  /* Queue a buffer in agg:sink_1. Do a flushing seek and simulate one upstream
+   * element failing to handle the seek (see src_event()). Check that the
+   * flushing seek logic doesn't get triggered by checking that the buffer
+   * queued on agg:sink_1 doesn't get flushed.
+   */
+
+  /* queue a buffer in agg:sink_1 */
+  buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
+  GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
+  data2->pad = srcpad2;
+  data2->buffer = buf2;
+  thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
+  fail_unless_collected (FALSE);
+
+  /* do the seek */
+  event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
+      GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
+  g_atomic_int_set (&fail_seek, TRUE);
+  fail_if (gst_pad_send_event (agg_srcpad, event));
+
+  /* flush srcpad1 (pretending it's the upstream that didn't fail to seek) */
+  fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
+  fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
+
+  /* check that the flush events reached agg:src */
+  fail_unless_equals_int (flush_start_events, 1);
+  fail_unless_equals_int (flush_stop_events, 1);
+
+  /* push a buffer on agg:sink_0. This should trigger a collect since agg:sink_1
+   * should not have been flushed at this point */
+  buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
+  GST_BUFFER_TIMESTAMP (buf1) = 0;
+  data1->pad = srcpad1;
+  data1->buffer = buf1;
+  thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
+  fail_unless_collected (TRUE);
+  collected = FALSE;
+
+  /* at this point thread1 must have returned */
+  g_thread_join (thread1);
+
+  /* push eos on agg:sink_0 so the buffer queued in agg:sink_1 is collected and
+   * the pushing thread returns */
+  data1->pad = srcpad1;
+  data1->event = gst_event_new_eos ();
+  thread1 = g_thread_try_new ("gst-check", push_event, data1, NULL);
+  fail_unless_collected (TRUE);
+
+  g_thread_join (thread1);
+  g_thread_join (thread2);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_flushing_seek)
+{
+  GstBuffer *buf1, *buf2;
+  GThread *thread1, *thread2;
+  GstEvent *event;
+
+  /* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the
+   * new flushing seek logic is triggered. On the first FLUSH_START call the
+   * buffers queued in collectpads should get flushed. Only one FLUSH_START and
+   * one FLUSH_STOP should be forwarded downstream.
+   */
+  buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
+  GST_BUFFER_TIMESTAMP (buf2) = 0;
+  data2->pad = srcpad2;
+  data2->buffer = buf2;
+  /* expect this buffer to be flushed */
+  data2->expected_result = GST_FLOW_FLUSHING;
+  thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
+
+  /* now do a successful flushing seek */
+  event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
+      GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
+  g_atomic_int_set (&fail_seek, FALSE);
+  fail_unless (gst_pad_send_event (agg_srcpad, event));
+
+  /* flushing starts once one of the upstream elements sends the first
+   * FLUSH_START */
+  fail_unless_equals_int (flush_start_events, 0);
+  fail_unless_equals_int (flush_stop_events, 0);
+
+  /* flush ogg:sink_0. This flushs collectpads, calls ::flush() and sends
+   * FLUSH_START downstream */
+  fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
+  fail_unless_equals_int (flush_start_events, 1);
+  fail_unless_equals_int (flush_stop_events, 0);
+  /* the first FLUSH_STOP is forwarded downstream */
+  fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
+  fail_unless_equals_int (flush_start_events, 1);
+  fail_unless_equals_int (flush_stop_events, 1);
+  /* at this point even the other pad agg:sink_1 should be flushing so thread2
+   * should have stopped */
+  g_thread_join (thread2);
+
+  /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
+   * that flushing completes once all the pads have been flushed */
+  buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
+  GST_BUFFER_TIMESTAMP (buf1) = GST_SECOND;
+  data1->pad = srcpad1;
+  data1->buffer = buf1;
+  thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
+
+  /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
+   * sent downstream */
+  gst_pad_push_event (srcpad2, gst_event_new_flush_start ());
+  gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
+
+  /* still, only one FLUSH_START and one FLUSH_STOP are forwarded downstream */
+  fail_unless_equals_int (flush_start_events, 1);
+  fail_unless_equals_int (flush_stop_events, 1);
+
+  /* EOS agg:sink_1 so the buffer queued in agg:sink_0 is collected */
+  data2->pad = srcpad2;
+  data2->event = gst_event_new_eos ();
+  thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
+  fail_unless_collected (TRUE);
+
+  /* these will return immediately as at this point the threads have been
+   * unlocked and are finished */
+  g_thread_join (thread1);
+  g_thread_join (thread2);
+}
+
+GST_END_TEST;
 
 static Suite *
 gst_collect_pads_suite (void)
 {
   Suite *suite;
-  TCase *general, *buffers, *pipeline;
+  TCase *general, *buffers, *pipeline, *flush;
 
   gst_agregator_plugin_register ();
 
   suite = suite_create ("GstCollectPads");
+
   general = tcase_create ("general");
   suite_add_tcase (suite, general);
   tcase_add_checked_fixture (general, setup, teardown);
@@ -807,6 +1032,12 @@ gst_collect_pads_suite (void)
   tcase_add_test (pipeline, test_linear_pipeline);
   tcase_add_test (pipeline, test_branched_pipeline);
 
+  flush = tcase_create ("flush");
+  suite_add_tcase (suite, flush);
+  tcase_add_checked_fixture (flush, flush_setup, flush_teardown);
+  tcase_add_test (flush, test_flushing_seek_failure);
+  tcase_add_test (flush, test_flushing_seek);
+
   return suite;
 }