From: Stefan Sauer Date: Mon, 18 Feb 2013 19:47:04 +0000 (+0100) Subject: collectpads: add two more tests using collectpads within an element X-Git-Tag: 1.1.1~255 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=03e81ca8a2d01181d3733c172c604bc187eec55a;p=platform%2Fupstream%2Fgstreamer.git collectpads: add two more tests using collectpads within an element Add a static plugin with a rudimentary element using collectpads and do some pipeline based tests. --- diff --git a/tests/check/libs/collectpads.c b/tests/check/libs/collectpads.c index 30a64e3..3336cc4 100644 --- a/tests/check/libs/collectpads.c +++ b/tests/check/libs/collectpads.c @@ -21,9 +21,232 @@ * Boston, MA 02110-1301, USA. */ +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + #include #include +/* dummy collectpads based element */ + +#define GST_TYPE_AGGREGATOR (gst_aggregator_get_type ()) +#define GST_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_AGGREGATOR, GstAggregator)) +#define GST_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_AGGREGATOR, GstAggregatorClass)) +#define GST_AGGREGATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_AGGREGATOR, GstAggregatorClass)) + +typedef struct _GstAggregator GstAggregator; +typedef struct _GstAggregatorClass GstAggregatorClass; + +struct _GstAggregator +{ + GstElement parent; + GstCollectPads *collect; + GstPad *srcpad; + GstPad *sinkpad[2]; + gint padcount; +}; +struct _GstAggregatorClass +{ + GstElementClass parent_class; +}; + +static GType gst_aggregator_get_type (void); + +G_DEFINE_TYPE (GstAggregator, gst_aggregator, GST_TYPE_ELEMENT); + +static GstStaticPadTemplate gst_aggregator_src_template = +GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +static GstStaticPadTemplate gst_aggregator_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST, + GST_STATIC_CAPS_ANY); + +static GstFlowReturn +gst_agregator_collected (GstCollectPads * pads, gpointer user_data) +{ + GstAggregator *aggregator = GST_AGGREGATOR (user_data); + GstBuffer *inbuf; + GstCollectData *collect_data = (GstCollectData *) pads->data->data; + guint outsize = gst_collect_pads_available (pads); + + /* can only happen when no pads to collect or all EOS */ + if (outsize == 0) + goto eos; + + inbuf = gst_collect_pads_take_buffer (pads, collect_data, outsize); + if (!inbuf) + goto eos; + + /* just forward the first buffer */ + GST_DEBUG_OBJECT (aggregator, "forward buffer %p", inbuf); + return gst_pad_push (aggregator->srcpad, inbuf); + /* ERRORS */ +eos: + { + GST_DEBUG_OBJECT (aggregator, "no data available, must be EOS"); + gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ()); + return GST_FLOW_EOS; + } +} + +static GstPad * +gst_aggregator_request_new_pad (GstElement * element, GstPadTemplate * templ, + const gchar * unused, const GstCaps * caps) +{ + GstAggregator *aggregator = GST_AGGREGATOR (element); + gchar *name; + GstPad *newpad; + gint padcount; + + if (templ->direction != GST_PAD_SINK) + return NULL; + + /* create new pad */ + padcount = g_atomic_int_add (&aggregator->padcount, 1); + name = g_strdup_printf ("sink_%u", padcount); + newpad = gst_pad_new_from_template (templ, name); + g_free (name); + + gst_collect_pads_add_pad (aggregator->collect, newpad, + sizeof (GstCollectData), NULL, TRUE); + + /* takes ownership of the pad */ + if (!gst_element_add_pad (GST_ELEMENT (aggregator), newpad)) + goto could_not_add; + + GST_DEBUG_OBJECT (aggregator, "added new pad %s", GST_OBJECT_NAME (newpad)); + return newpad; + + /* errors */ +could_not_add: + { + GST_DEBUG_OBJECT (aggregator, "could not add pad"); + gst_collect_pads_remove_pad (aggregator->collect, newpad); + gst_object_unref (newpad); + return NULL; + } +} + +static void +gst_aggregator_release_pad (GstElement * element, GstPad * pad) +{ + GstAggregator *aggregator = GST_AGGREGATOR (element); + + if (aggregator->collect) + gst_collect_pads_remove_pad (aggregator->collect, pad); + gst_element_remove_pad (element, pad); +} + +static GstStateChangeReturn +gst_aggregator_change_state (GstElement * element, GstStateChange transition) +{ + GstAggregator *aggregator = GST_AGGREGATOR (element); + GstStateChangeReturn ret; + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_collect_pads_start (aggregator->collect); + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + /* need to unblock the collectpads before calling the + * parent change_state so that streaming can finish */ + gst_collect_pads_stop (aggregator->collect); + break; + default: + break; + } + + ret = + GST_ELEMENT_CLASS (gst_aggregator_parent_class)->change_state (element, + transition); + + switch (transition) { + default: + break; + } + + return ret; +} + +static void +gst_aggregator_dispose (GObject * object) +{ + GstAggregator *aggregator = GST_AGGREGATOR (object); + + if (aggregator->collect) { + gst_object_unref (aggregator->collect); + aggregator->collect = NULL; + } + + G_OBJECT_CLASS (gst_aggregator_parent_class)->dispose (object); +} + +static void +gst_aggregator_class_init (GstAggregatorClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + GstElementClass *gstelement_class = (GstElementClass *) klass; + + gobject_class->dispose = gst_aggregator_dispose; + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&gst_aggregator_src_template)); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&gst_aggregator_sink_template)); + gst_element_class_set_static_metadata (gstelement_class, "Aggregator", + "Testing", "Combine N buffers", "Stefan Sauer "); + + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_aggregator_release_pad); + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_aggregator_change_state); +} + +static void +gst_aggregator_init (GstAggregator * agregator) +{ + GstPadTemplate *template; + + template = gst_static_pad_template_get (&gst_aggregator_src_template); + agregator->srcpad = gst_pad_new_from_template (template, "src"); + gst_object_unref (template); + + GST_PAD_SET_PROXY_CAPS (agregator->srcpad); + gst_element_add_pad (GST_ELEMENT (agregator), agregator->srcpad); + + /* keep track of the sinkpads requested */ + agregator->collect = gst_collect_pads_new (); + gst_collect_pads_set_function (agregator->collect, + GST_DEBUG_FUNCPTR (gst_agregator_collected), agregator); +} + +static gboolean +gst_agregator_plugin_init (GstPlugin * plugin) +{ + return gst_element_register (plugin, "aggregator", GST_RANK_NONE, + GST_TYPE_AGGREGATOR); +} + +static gboolean +gst_agregator_plugin_register (void) +{ + return gst_plugin_register_static (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "aggregator", + "Combine buffers", + gst_agregator_plugin_init, + VERSION, GST_LICENSE, PACKAGE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN); +} + + #define fail_unless_collected(expected) \ G_STMT_START { \ g_mutex_lock (&lock); \ @@ -430,11 +653,115 @@ GST_START_TEST (test_collect_default) GST_END_TEST; + +#define NUM_BUFFERS 3 +static void +handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count) +{ + *count = *count + 1; +} + +/* Test a linear pipeline using aggregator */ +GST_START_TEST (test_linear_pipeline) +{ + GstElement *pipeline, *src, *agg, *sink; + GstBus *bus; + GstMessage *msg; + gint count = 0; + + pipeline = gst_pipeline_new ("pipeline"); + src = gst_check_setup_element ("fakesrc"); + g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4, + NULL); + agg = gst_check_setup_element ("aggregator"); + sink = gst_check_setup_element ("fakesink"); + g_object_set (sink, "signal-handoffs", TRUE, NULL); + g_signal_connect (sink, "handoff", (GCallback) handoff, &count); + + fail_unless (gst_bin_add (GST_BIN (pipeline), src)); + fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); + fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); + fail_unless (gst_element_link (src, agg)); + fail_unless (gst_element_link (agg, sink)); + + bus = gst_element_get_bus (pipeline); + fail_if (bus == NULL); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS); + gst_message_unref (msg); + + fail_unless_equals_int (count, NUM_BUFFERS); + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (bus); + gst_object_unref (pipeline); +} + +GST_END_TEST; + +/* Test a linear pipeline using aggregator */ +GST_START_TEST (test_branched_pipeline) +{ + GstElement *pipeline, *src, *tee, *queue[2], *agg, *sink; + GstBus *bus; + GstMessage *msg; + gint count = 0; + + pipeline = gst_pipeline_new ("pipeline"); + src = gst_check_setup_element ("fakesrc"); + g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4, + NULL); + tee = gst_check_setup_element ("tee"); + queue[0] = gst_check_setup_element ("queue"); + gst_object_set_name (GST_OBJECT (queue[0]), "queue0"); + queue[1] = gst_check_setup_element ("queue"); + gst_object_set_name (GST_OBJECT (queue[1]), "queue1"); + agg = gst_check_setup_element ("aggregator"); + sink = gst_check_setup_element ("fakesink"); + g_object_set (sink, "signal-handoffs", TRUE, NULL); + g_signal_connect (sink, "handoff", (GCallback) handoff, &count); + + fail_unless (gst_bin_add (GST_BIN (pipeline), src)); + fail_unless (gst_bin_add (GST_BIN (pipeline), tee)); + fail_unless (gst_bin_add (GST_BIN (pipeline), queue[0])); + fail_unless (gst_bin_add (GST_BIN (pipeline), queue[1])); + fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); + fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); + fail_unless (gst_element_link (src, tee)); + fail_unless (gst_element_link (tee, queue[0])); + fail_unless (gst_element_link (tee, queue[1])); + fail_unless (gst_element_link (queue[0], agg)); + fail_unless (gst_element_link (queue[1], agg)); + fail_unless (gst_element_link (agg, sink)); + + bus = gst_element_get_bus (pipeline); + fail_if (bus == NULL); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS); + gst_message_unref (msg); + + /* we have two branches, but we still only forward buffers from one branch */ + fail_unless_equals_int (count, NUM_BUFFERS); + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (bus); + gst_object_unref (pipeline); +} + +GST_END_TEST; + + static Suite * gst_collect_pads_suite (void) { Suite *suite; - TCase *general, *buffers; + TCase *general, *buffers, *pipeline; + + gst_agregator_plugin_register (); suite = suite_create ("GstCollectPads"); general = tcase_create ("general"); @@ -450,6 +777,11 @@ gst_collect_pads_suite (void) tcase_add_checked_fixture (buffers, setup_buffer_cb, teardown); tcase_add_test (buffers, test_collect_default); + pipeline = tcase_create ("pipeline"); + suite_add_tcase (suite, pipeline); + tcase_add_test (pipeline, test_linear_pipeline); + tcase_add_test (pipeline, test_branched_pipeline); + return suite; }