From 4cc7b818fdf7dfe2603f2198605c6b6ac7988883 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 3 Jul 2007 16:26:29 +0000 Subject: [PATCH] plugins/elements/gsttee.c: Be a lot smarter when deciding what srcpad to use for proxying the buffer_alloc. Also hand... Original commit message from CVS: * plugins/elements/gsttee.c: (gst_tee_base_init), (gst_tee_request_new_pad), (gst_tee_release_pad), (gst_tee_find_buffer_alloc), (gst_tee_buffer_alloc), (gst_tee_do_push), (clear_pads), (gst_tee_handle_buffer), (gst_tee_chain): Be a lot smarter when deciding what srcpad to use for proxying the buffer_alloc. Also handle pad added/removed when doing so. Fixes #357959. Keep track of what pads we already pushed on in case we have pads added/removed while pushing. Fixes #374639 * tests/check/Makefile.am: * tests/check/elements/tee.c: (handoff), (GST_START_TEST), (tee_suite): Added unit test for pad resync. --- ChangeLog | 18 ++++ plugins/elements/gsttee.c | 258 +++++++++++++++++++++++++++++++++++---------- tests/check/Makefile.am | 1 + tests/check/elements/tee.c | 145 +++++++++++++++++++++++++ 4 files changed, 364 insertions(+), 58 deletions(-) create mode 100644 tests/check/elements/tee.c diff --git a/ChangeLog b/ChangeLog index 2f607b4..a03eccc4 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,21 @@ +2007-07-03 Wim Taymans + + * plugins/elements/gsttee.c: (gst_tee_base_init), + (gst_tee_request_new_pad), (gst_tee_release_pad), + (gst_tee_find_buffer_alloc), (gst_tee_buffer_alloc), + (gst_tee_do_push), (clear_pads), (gst_tee_handle_buffer), + (gst_tee_chain): + Be a lot smarter when deciding what srcpad to use for proxying + the buffer_alloc. Also handle pad added/removed when doing so. + Fixes #357959. + Keep track of what pads we already pushed on in case we have pads + added/removed while pushing. Fixes #374639 + + * tests/check/Makefile.am: + * tests/check/elements/tee.c: (handoff), (GST_START_TEST), + (tee_suite): + Added unit test for pad resync. + 2007-07-01 Thomas Vander Stichele * po/nl.po: diff --git a/plugins/elements/gsttee.c b/plugins/elements/gsttee.c index f9ba0b7..5b5382e 100644 --- a/plugins/elements/gsttee.c +++ b/plugins/elements/gsttee.c @@ -1,7 +1,7 @@ /* GStreamer * Copyright (C) 1999,2000 Erik Walthinsen * 2000,2001,2002,2003,2004,2005 Wim Taymans - * + * 2007 Wim Taymans * * gsttee.c: Tee element, one in N out * @@ -91,6 +91,15 @@ GstStaticPadTemplate tee_src_template = GST_STATIC_PAD_TEMPLATE ("src%d", GST_BOILERPLATE_FULL (GstTee, gst_tee, GstElement, GST_TYPE_ELEMENT, _do_init); +/* structure and quark to keep track of which pads have been pushed */ +static GQuark push_data; + +typedef struct +{ + gboolean pushed; + GstFlowReturn result; +} PushData; + static GstPad *gst_tee_request_new_pad (GstElement * element, GstPadTemplate * temp, const gchar * unused); static void gst_tee_release_pad (GstElement * element, GstPad * pad); @@ -125,6 +134,8 @@ gst_tee_base_init (gpointer g_class) gst_static_pad_template_get (&sinktemplate)); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&tee_src_template)); + + push_data = g_quark_from_static_string ("tee-push-data"); } static void @@ -211,19 +222,27 @@ gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ, GstTee *tee; GstActivateMode mode; gboolean res; + PushData *data; tee = GST_TEE (element); + GST_DEBUG_OBJECT (tee, "requesting pad"); + GST_OBJECT_LOCK (tee); name = g_strdup_printf ("src%d", tee->pad_counter++); srcpad = gst_pad_new_from_template (templ, name); g_free (name); - if (tee->allocpad == NULL) - tee->allocpad = srcpad; - mode = tee->sink_mode; + + /* install the data, we automatically free it when the pad is disposed because + * of _release_pad or when the element goes away. */ + data = g_new0 (PushData, 1); + data->pushed = FALSE; + data->result = GST_FLOW_NOT_LINKED; + g_object_set_qdata_full (G_OBJECT (srcpad), push_data, data, g_free); + GST_OBJECT_UNLOCK (tee); switch (mode) { @@ -275,6 +294,8 @@ gst_tee_release_pad (GstElement * element, GstPad * pad) tee = GST_TEE (element); + GST_DEBUG_OBJECT (tee, "releasing pad"); + GST_OBJECT_LOCK (tee); if (tee->allocpad == pad) tee->allocpad = NULL; @@ -348,6 +369,61 @@ gst_tee_get_property (GObject * object, guint prop_id, GValue * value, GST_OBJECT_UNLOCK (tee); } +/* we have no previous source pad we can use to proxy the pad alloc. Loop over + * the source pads, try to alloc a buffer on each one of them. Keep a reference + * to the first pad that succeeds, we will be using it to alloc more buffers + * later. */ +static GstFlowReturn +gst_tee_find_buffer_alloc (GstTee * tee, guint64 offset, guint size, + GstCaps * caps, GstBuffer ** buf) +{ + GstFlowReturn res; + GList *pads; + guint32 cookie; + + res = GST_FLOW_NOT_LINKED; + +retry: + pads = GST_ELEMENT_CAST (tee)->srcpads; + cookie = GST_ELEMENT_CAST (tee)->pads_cookie; + + while (pads) { + GstPad *pad; + + pad = GST_PAD_CAST (pads->data); + gst_object_ref (pad); + GST_DEBUG_OBJECT (tee, "try alloc on pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + GST_OBJECT_UNLOCK (tee); + + res = gst_pad_alloc_buffer (pad, offset, size, caps, buf); + + GST_DEBUG_OBJECT (tee, "got return value %d", res); + + gst_object_unref (pad); + + GST_OBJECT_LOCK (tee); + if (GST_ELEMENT_CAST (tee)->pads_cookie != cookie) { + GST_DEBUG_OBJECT (tee, "pad list changed, restart"); + /* pad list changed, restart. If the pad alloc function returned OK we + * need to unref the buffer */ + if (res == GST_FLOW_OK) + gst_buffer_unref (*buf); + goto retry; + } + if (res == GST_FLOW_OK) { + GST_DEBUG_OBJECT (tee, "we have a buffer on pad %s:%s", + GST_DEBUG_PAD_NAME (pad)); + /* we have a buffer, keep the pad for later and exit the loop. */ + tee->allocpad = pad; + break; + } + /* no valid buffer, try another pad */ + pads = g_list_next (pads); + } + + return res; +} + static GstFlowReturn gst_tee_buffer_alloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf) @@ -358,100 +434,162 @@ gst_tee_buffer_alloc (GstPad * pad, guint64 offset, guint size, tee = GST_TEE (GST_PAD_PARENT (pad)); + res = GST_FLOW_NOT_LINKED; + GST_OBJECT_LOCK (tee); - if ((allocpad = tee->allocpad)) + if ((allocpad = tee->allocpad)) { + /* if we had a previous pad we used for allocating a buffer, continue using + * it. */ + GST_DEBUG_OBJECT (tee, "using pad %s:%s for alloc", + GST_DEBUG_PAD_NAME (allocpad)); gst_object_ref (allocpad); - GST_OBJECT_UNLOCK (tee); + GST_OBJECT_UNLOCK (tee); - if (allocpad) { res = gst_pad_alloc_buffer (allocpad, offset, size, caps, buf); gst_object_unref (allocpad); - } else { - res = GST_FLOW_OK; - *buf = NULL; + + GST_OBJECT_LOCK (tee); } + /* either we failed to alloc on the the previous pad or we did not have a + * previous pad. */ + if (res == GST_FLOW_NOT_LINKED) { + /* find a new pad to alloc a buffer on */ + GST_DEBUG_OBJECT (tee, "finding pad for alloc"); + res = gst_tee_find_buffer_alloc (tee, offset, size, caps, buf); + } + GST_OBJECT_UNLOCK (tee); + return res; } -typedef struct -{ - GstTee *tee; - GstBuffer *buffer; -} PushData; - -static gboolean -gst_tee_do_push (GstPad * pad, GValue * ret, PushData * data) +static GstFlowReturn +gst_tee_do_push (GstTee * tee, GstPad * pad, GstBuffer * buffer) { GstFlowReturn res; - GstTee *tee = data->tee; - - if (G_UNLIKELY (!data->tee->silent)) { - GstBuffer *buf = data->buffer; + if (G_UNLIKELY (!tee->silent)) { GST_OBJECT_LOCK (tee); g_free (tee->last_message); tee->last_message = g_strdup_printf ("chain ******* (%s:%s)t (%d bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad), - GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf), buf); + GST_BUFFER_SIZE (buffer), GST_BUFFER_TIMESTAMP (buffer), buffer); GST_OBJECT_UNLOCK (tee); g_object_notify (G_OBJECT (tee), "last_message"); } /* Push */ - if (pad == data->tee->pull_pad) { + if (pad == tee->pull_pad) { + /* don't push on the pad we're pulling from */ res = GST_FLOW_OK; } else { - res = gst_pad_push (pad, gst_buffer_ref (data->buffer)); - GST_LOG_OBJECT (tee, "Pushing buffer %p to %" GST_PTR_FORMAT - " yielded result=%d", data->buffer, pad, res); + res = gst_pad_push (pad, gst_buffer_ref (buffer)); } + return res; +} - /* If it's fatal or OK, or if ret is currently - * not-linked, we overwrite the previous value */ - if (GST_FLOW_IS_FATAL (res) || (res == GST_FLOW_OK) || - (g_value_get_enum (ret) == GST_FLOW_NOT_LINKED)) { - GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", - g_value_get_enum (ret), res); - g_value_set_enum (ret, res); - } +static void +clear_pads (GstPad * pad, GstTee * tee) +{ + PushData *data; - gst_object_unref (pad); + data = g_object_get_qdata (G_OBJECT (pad), push_data); + + /* the data must be there or we have a screwed up internal state */ + g_assert (data != NULL); - /* Stop iterating if flow return is fatal */ - return (!GST_FLOW_IS_FATAL (res)); + data->pushed = FALSE; + data->result = GST_FLOW_NOT_LINKED; } static GstFlowReturn gst_tee_handle_buffer (GstTee * tee, GstBuffer * buffer) { - GstIterator *iter; - PushData data; - GValue ret = { 0, }; - GstIteratorResult res; + GList *pads; + guint32 cookie; + GstFlowReturn ret, cret; tee->offset += GST_BUFFER_SIZE (buffer); - g_value_init (&ret, GST_TYPE_FLOW_RETURN); - g_value_set_enum (&ret, GST_FLOW_NOT_LINKED); - iter = gst_element_iterate_src_pads (GST_ELEMENT (tee)); - data.tee = tee; - data.buffer = buffer; - - GST_LOG_OBJECT (tee, "Starting to push buffer %p", buffer); - /* FIXME: Not sure how tee would handle RESEND buffer from some of the - * pads but not from others. */ - res = gst_iterator_fold (iter, (GstIteratorFoldFunction) gst_tee_do_push, - &ret, &data); - gst_iterator_free (iter); - - GST_LOG_OBJECT (tee, "Pushing buffer %p yielded result=%d", buffer, - g_value_get_enum (&ret)); + GST_OBJECT_LOCK (tee); + /* mark all pads as 'not pushed on yet' */ + g_list_foreach (GST_ELEMENT_CAST (tee)->srcpads, (GFunc) clear_pads, tee); + +restart: + cret = GST_FLOW_NOT_LINKED; + pads = GST_ELEMENT_CAST (tee)->srcpads; + cookie = GST_ELEMENT_CAST (tee)->pads_cookie; + + while (pads) { + GstPad *pad; + PushData *data; + + pad = GST_PAD_CAST (pads->data); + + /* get the private data, something is really wrong with the internal state + * when it is not there */ + data = g_object_get_qdata (G_OBJECT (pad), push_data); + + g_assert (data != NULL); + + if (!data->pushed) { + /* not yet pushed, release lock and start pushing */ + gst_object_ref (pad); + GST_OBJECT_UNLOCK (tee); + + GST_LOG_OBJECT (tee, "Starting to push buffer %p", buffer); + + ret = gst_tee_do_push (tee, pad, buffer); + + GST_LOG_OBJECT (tee, "Pushing buffer %p yielded result %s", buffer, + gst_flow_get_name (ret)); + + GST_OBJECT_LOCK (tee); + /* keep track of which pad we pushed and the result value. We need to do + * this before we release the refcount on the pad, the PushData is + * destroyed when the last ref of the pad goes away. */ + data->pushed = TRUE; + data->result = ret; + gst_object_unref (pad); + } else { + /* already pushed, use previous return value */ + ret = data->result; + GST_LOG_OBJECT (tee, "pad already pushed with %s", + gst_flow_get_name (ret)); + } + /* stop pushing more buffers when we have a fatal error */ + if (GST_FLOW_IS_FATAL (ret)) + goto error; + + /* keep all other return values, overwriting the previous one */ + GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret); + if (cret == GST_FLOW_NOT_LINKED) + cret = ret; + + if (GST_ELEMENT_CAST (tee)->pads_cookie != cookie) { + GST_LOG_OBJECT (tee, "pad list changed"); + /* the list of pads changed, restart iteration. Pads that we already + * pushed on and are still in the new list, will not be pushed on + * again. */ + goto restart; + } + pads = g_list_next (pads); + } + GST_OBJECT_UNLOCK (tee); gst_buffer_unref (buffer); /* no need to unset gvalue */ - return g_value_get_enum (&ret); + return cret; + + /* ERRORS */ +error: + { + GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret)); + gst_buffer_unref (buffer); + GST_OBJECT_UNLOCK (tee); + return ret; + } } static GstFlowReturn @@ -462,8 +600,12 @@ gst_tee_chain (GstPad * pad, GstBuffer * buffer) tee = GST_TEE (gst_pad_get_parent (pad)); + GST_DEBUG_OBJECT (tee, "received buffer %p", buffer); + res = gst_tee_handle_buffer (tee, buffer); + GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res)); + gst_object_unref (tee); return res; diff --git a/tests/check/Makefile.am b/tests/check/Makefile.am index 34f379e..900398a 100644 --- a/tests/check/Makefile.am +++ b/tests/check/Makefile.am @@ -62,6 +62,7 @@ REGISTRY_CHECKS = \ elements/filesrc \ elements/identity \ elements/multiqueue \ + elements/tee \ libs/basesrc \ libs/controller \ libs/typefindhelper \ diff --git a/tests/check/elements/tee.c b/tests/check/elements/tee.c new file mode 100644 index 0000000..c996f12 --- /dev/null +++ b/tests/check/elements/tee.c @@ -0,0 +1,145 @@ +/* GStreamer + * + * unit test for tee + * + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include +#include +#include +#include + +#include + +static gint count1; +static gint count2; + +static void +handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count) +{ + *count = *count + 1; +} + +/* construct fakesrc num-buffers=3 ! tee name=t ! queue ! fakesink t. ! queue ! + * fakesink. Each fakesink should exactly receive 3 buffers. + */ +GST_START_TEST (test_num_buffers) +{ + GstElement *pipeline; + GstElement *f1, *f2; + gchar *desc; + GstBus *bus; + GstMessage *msg; + + desc = "fakesrc num-buffers=3 ! tee name=t ! queue ! fakesink name=f1 " + "t. ! queue ! fakesink name=f2"; + pipeline = gst_parse_launch (desc, NULL); + fail_if (pipeline == NULL); + + f1 = gst_bin_get_by_name (GST_BIN (pipeline), "f1"); + fail_if (f1 == NULL); + f2 = gst_bin_get_by_name (GST_BIN (pipeline), "f2"); + fail_if (f2 == NULL); + + count1 = 0; + count2 = 0; + + g_object_set (G_OBJECT (f1), "signal-handoffs", TRUE, NULL); + g_signal_connect (G_OBJECT (f1), "handoff", (GCallback) handoff, &count1); + g_object_set (G_OBJECT (f2), "signal-handoffs", TRUE, NULL); + g_signal_connect (G_OBJECT (f2), "handoff", (GCallback) handoff, &count2); + + bus = gst_element_get_bus (pipeline); + fail_if (bus == NULL); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS); + gst_message_unref (msg); + + fail_if (count1 != 3); + fail_if (count2 != 3); + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (f1); + gst_object_unref (f2); + gst_object_unref (bus); + gst_object_unref (pipeline); +} + +GST_END_TEST; + +/* we use fakesrc ! tee ! fakesink and then randomly request/release and link + * some pads from tee. This should happily run without any errors. */ +GST_START_TEST (test_stress) +{ + GstElement *pipeline; + GstElement *tee; + gchar *desc; + GstBus *bus; + GstMessage *msg; + gint i; + + desc = "fakesrc num-buffers=100000 ! tee name=t ! queue ! fakesink"; + pipeline = gst_parse_launch (desc, NULL); + fail_if (pipeline == NULL); + + tee = gst_bin_get_by_name (GST_BIN (pipeline), "t"); + fail_if (tee == NULL); + + /* bring the pipeline to PLAYING, then start switching */ + bus = gst_element_get_bus (pipeline); + fail_if (bus == NULL); + gst_element_set_state (pipeline, GST_STATE_PLAYING); + + for (i = 0; i < 50000; i++) { + GstPad *pad; + + pad = gst_element_get_request_pad (tee, "src%d"); + gst_element_release_request_pad (tee, pad); + gst_object_unref (pad); + } + + /* now wait for completion or error */ + msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); + fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS); + gst_message_unref (msg); + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (tee); + gst_object_unref (bus); + gst_object_unref (pipeline); +} + +GST_END_TEST; + +Suite * +tee_suite (void) +{ + Suite *s = suite_create ("tee"); + TCase *tc_chain = tcase_create ("general"); + + suite_add_tcase (s, tc_chain); + tcase_add_test (tc_chain, test_num_buffers); + tcase_add_test (tc_chain, test_stress); + + return s; +} + +GST_CHECK_MAIN (tee); -- 2.7.4