From f52b5ddcd2de711c1e095d6b4875566cffcc8244 Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Mon, 16 Sep 2013 08:35:37 +0200 Subject: [PATCH] tests: collectpads: add flushing seek tests https://bugzilla.gnome.org/show_bug.cgi?id=708416 --- tests/check/libs/collectpads.c | 237 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 234 insertions(+), 3 deletions(-) diff --git a/tests/check/libs/collectpads.c b/tests/check/libs/collectpads.c index 6de82d8..689353e 100644 --- a/tests/check/libs/collectpads.c +++ b/tests/check/libs/collectpads.c @@ -291,6 +291,7 @@ typedef struct GstPad *pad; GstBuffer *buffer; GstEvent *event; + GstFlowReturn expected_result; } TestData; static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", @@ -305,10 +306,13 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", static GstCollectPads *collect; static gboolean collected; -static GstPad *srcpad1, *srcpad2; +static GstPad *agg_srcpad, *srcpad1, *srcpad2; static GstPad *sinkpad1, *sinkpad2; static TestData *data1, *data2; static GstBuffer *outbuf1, *outbuf2; +static GstElement *agg; +gboolean fail_seek; +gint flush_start_events, flush_stop_events; static GMutex lock; static GCond cond; @@ -362,7 +366,7 @@ push_buffer (gpointer user_data) gst_pad_push_event (test_data->pad, gst_event_new_segment (&segment)); flow = gst_pad_push (test_data->pad, test_data->buffer); - fail_unless (flow == GST_FLOW_OK, "got flow %s instead of OK", + fail_unless (flow == test_data->expected_result, "got flow %s instead of OK", gst_flow_get_name (flow)); return NULL; @@ -779,16 +783,237 @@ GST_START_TEST (test_branched_pipeline) GST_END_TEST; +static GstPadProbeReturn +downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) { + if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) == + GST_EVENT_FLUSH_START) + g_atomic_int_inc (&flush_start_events); + else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) == + GST_EVENT_FLUSH_STOP) + g_atomic_int_inc (&flush_stop_events); + } else if (info->type & GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM) { + g_mutex_lock (&lock); + collected = TRUE; + g_cond_signal (&cond); + g_mutex_unlock (&lock); + } + + return GST_PAD_PROBE_DROP; +} + +static gboolean +src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + gboolean ret = TRUE; + if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK) { + if (g_atomic_int_compare_and_exchange (&fail_seek, TRUE, FALSE) == TRUE) { + ret = FALSE; + } + } + + gst_event_unref (event); + return ret; +} + +static gboolean +agg_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + return gst_collect_pads_src_event_default (GST_AGGREGATOR (parent)->collect, + pad, event); +} + +static GstPad * +setup_src_pad (GstElement * element, + GstStaticPadTemplate * tmpl, const char *name) +{ + GstPad *srcpad, *sinkpad; + + srcpad = gst_pad_new_from_static_template (tmpl, "src"); + sinkpad = gst_element_get_request_pad (element, name); + fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK, + "Could not link source and %s sink pads", GST_ELEMENT_NAME (element)); + gst_pad_set_event_function (srcpad, src_event); + gst_pad_set_active (srcpad, TRUE); + + return srcpad; +} + +static void +flush_setup (void) +{ + agg = gst_check_setup_element ("aggregator"); + agg_srcpad = gst_element_get_static_pad (agg, "src"); + srcpad1 = setup_src_pad (agg, &srctemplate, "sink_0"); + srcpad2 = setup_src_pad (agg, &srctemplate, "sink_1"); + gst_pad_add_probe (agg_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | + GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | + GST_PAD_PROBE_TYPE_EVENT_FLUSH, downstream_probe_cb, NULL, NULL); + gst_pad_set_event_function (agg_srcpad, agg_src_event); + data1 = g_new0 (TestData, 1); + data2 = g_new0 (TestData, 1); + g_atomic_int_set (&flush_start_events, 0); + g_atomic_int_set (&flush_stop_events, 0); + gst_element_set_state (agg, GST_STATE_PLAYING); +} + +static void +flush_teardown (void) +{ + gst_element_set_state (agg, GST_STATE_NULL); + gst_object_unref (agg); + gst_object_unref (agg_srcpad); + gst_object_unref (srcpad1); + gst_object_unref (srcpad2); + g_free (data1); + g_free (data2); +} + +GST_START_TEST (test_flushing_seek_failure) +{ + GstBuffer *buf1, *buf2; + GThread *thread1, *thread2; + GstEvent *event; + + /* Queue a buffer in agg:sink_1. Do a flushing seek and simulate one upstream + * element failing to handle the seek (see src_event()). Check that the + * flushing seek logic doesn't get triggered by checking that the buffer + * queued on agg:sink_1 doesn't get flushed. + */ + + /* queue a buffer in agg:sink_1 */ + buf2 = gst_buffer_new_allocate (NULL, 1, NULL); + GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND; + data2->pad = srcpad2; + data2->buffer = buf2; + thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL); + fail_unless_collected (FALSE); + + /* do the seek */ + event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, + GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND); + g_atomic_int_set (&fail_seek, TRUE); + fail_if (gst_pad_send_event (agg_srcpad, event)); + + /* flush srcpad1 (pretending it's the upstream that didn't fail to seek) */ + fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ())); + fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE))); + + /* check that the flush events reached agg:src */ + fail_unless_equals_int (flush_start_events, 1); + fail_unless_equals_int (flush_stop_events, 1); + + /* push a buffer on agg:sink_0. This should trigger a collect since agg:sink_1 + * should not have been flushed at this point */ + buf1 = gst_buffer_new_allocate (NULL, 1, NULL); + GST_BUFFER_TIMESTAMP (buf1) = 0; + data1->pad = srcpad1; + data1->buffer = buf1; + thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL); + fail_unless_collected (TRUE); + collected = FALSE; + + /* at this point thread1 must have returned */ + g_thread_join (thread1); + + /* push eos on agg:sink_0 so the buffer queued in agg:sink_1 is collected and + * the pushing thread returns */ + data1->pad = srcpad1; + data1->event = gst_event_new_eos (); + thread1 = g_thread_try_new ("gst-check", push_event, data1, NULL); + fail_unless_collected (TRUE); + + g_thread_join (thread1); + g_thread_join (thread2); +} + +GST_END_TEST; + +GST_START_TEST (test_flushing_seek) +{ + GstBuffer *buf1, *buf2; + GThread *thread1, *thread2; + GstEvent *event; + + /* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the + * new flushing seek logic is triggered. On the first FLUSH_START call the + * buffers queued in collectpads should get flushed. Only one FLUSH_START and + * one FLUSH_STOP should be forwarded downstream. + */ + buf2 = gst_buffer_new_allocate (NULL, 1, NULL); + GST_BUFFER_TIMESTAMP (buf2) = 0; + data2->pad = srcpad2; + data2->buffer = buf2; + /* expect this buffer to be flushed */ + data2->expected_result = GST_FLOW_FLUSHING; + thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL); + + /* now do a successful flushing seek */ + event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, + GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND); + g_atomic_int_set (&fail_seek, FALSE); + fail_unless (gst_pad_send_event (agg_srcpad, event)); + + /* flushing starts once one of the upstream elements sends the first + * FLUSH_START */ + fail_unless_equals_int (flush_start_events, 0); + fail_unless_equals_int (flush_stop_events, 0); + + /* flush ogg:sink_0. This flushs collectpads, calls ::flush() and sends + * FLUSH_START downstream */ + fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ())); + fail_unless_equals_int (flush_start_events, 1); + fail_unless_equals_int (flush_stop_events, 0); + /* the first FLUSH_STOP is forwarded downstream */ + fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE))); + fail_unless_equals_int (flush_start_events, 1); + fail_unless_equals_int (flush_stop_events, 1); + /* at this point even the other pad agg:sink_1 should be flushing so thread2 + * should have stopped */ + g_thread_join (thread2); + + /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify + * that flushing completes once all the pads have been flushed */ + buf1 = gst_buffer_new_allocate (NULL, 1, NULL); + GST_BUFFER_TIMESTAMP (buf1) = GST_SECOND; + data1->pad = srcpad1; + data1->buffer = buf1; + thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL); + + /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is + * sent downstream */ + gst_pad_push_event (srcpad2, gst_event_new_flush_start ()); + gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE)); + + /* still, only one FLUSH_START and one FLUSH_STOP are forwarded downstream */ + fail_unless_equals_int (flush_start_events, 1); + fail_unless_equals_int (flush_stop_events, 1); + + /* EOS agg:sink_1 so the buffer queued in agg:sink_0 is collected */ + data2->pad = srcpad2; + data2->event = gst_event_new_eos (); + thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL); + fail_unless_collected (TRUE); + + /* these will return immediately as at this point the threads have been + * unlocked and are finished */ + g_thread_join (thread1); + g_thread_join (thread2); +} + +GST_END_TEST; static Suite * gst_collect_pads_suite (void) { Suite *suite; - TCase *general, *buffers, *pipeline; + TCase *general, *buffers, *pipeline, *flush; gst_agregator_plugin_register (); suite = suite_create ("GstCollectPads"); + general = tcase_create ("general"); suite_add_tcase (suite, general); tcase_add_checked_fixture (general, setup, teardown); @@ -807,6 +1032,12 @@ gst_collect_pads_suite (void) tcase_add_test (pipeline, test_linear_pipeline); tcase_add_test (pipeline, test_branched_pipeline); + flush = tcase_create ("flush"); + suite_add_tcase (suite, flush); + tcase_add_checked_fixture (flush, flush_setup, flush_teardown); + tcase_add_test (flush, test_flushing_seek_failure); + tcase_add_test (flush, test_flushing_seek); + return suite; } -- 2.7.4