From cf231073c241457e825c951114f21080c1588389 Mon Sep 17 00:00:00 2001 From: Thomas Vander Stichele Date: Mon, 12 Sep 2005 18:14:03 +0000 Subject: [PATCH] add a gst_element_set_state_async method that sets the state and starts a thread to make sure the state change comple... Original commit message from CVS: * check/gst/gstpipeline.c: (GST_START_TEST): * docs/gst/gstreamer-sections.txt: * gst/gstutils.c: (set_state_async_thread_func), (gst_element_set_state_async): * gst/gstutils.h: add a gst_element_set_state_async method that sets the state and starts a thread to make sure the state change completes as best as it can --- ChangeLog | 11 +++++ check/gst/gstpipeline.c | 52 +++++---------------- docs/gst/gstreamer-sections.txt | 1 + gst/gstutils.c | 86 ++++++++++++++++++++++++++++++++++ gst/gstutils.h | 100 ++++++++++++++++++++-------------------- tests/check/gst/gstpipeline.c | 52 +++++---------------- 6 files changed, 173 insertions(+), 129 deletions(-) diff --git a/ChangeLog b/ChangeLog index 9bd6394..9e5c208 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,16 @@ 2005-09-12 Thomas Vander Stichele + * check/gst/gstpipeline.c: (GST_START_TEST): + * docs/gst/gstreamer-sections.txt: + * gst/gstutils.c: (set_state_async_thread_func), + (gst_element_set_state_async): + * gst/gstutils.h: + add a "gst_element_set_state_async" method that + sets the state and starts a thread to make sure the state + change completes as best as it can + +2005-09-12 Thomas Vander Stichele + * check/gst/gstpipeline.c: (GST_START_TEST), (gst_pipeline_suite): codify design+behaviour in testsuite after discussion diff --git a/check/gst/gstpipeline.c b/check/gst/gstpipeline.c index 4f7dd86..698cabf 100644 --- a/check/gst/gstpipeline.c +++ b/check/gst/gstpipeline.c @@ -21,26 +21,6 @@ #include -#if 0 -static void -pop_messages (GstBus * bus, int count) -{ - GstMessage *message; - - int i; - - GST_DEBUG ("popping %d messages", count); - for (i = 0; i < count; ++i) { - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1) - == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - - message = gst_bus_pop (bus); - gst_message_unref (message); - } - GST_DEBUG ("popped %d messages", count); -} -#endif - /* an empty pipeline can go to PLAYING in one go */ GST_START_TEST (test_async_state_change_empty) { @@ -87,6 +67,7 @@ GST_START_TEST (test_async_state_change_fake) GstPipeline *pipeline; GstElement *src, *sink; GstBus *bus; + gboolean done = FALSE; pipeline = GST_PIPELINE (gst_pipeline_new (NULL)); fail_unless (pipeline != NULL, "Could not create pipeline"); @@ -100,37 +81,28 @@ GST_START_TEST (test_async_state_change_fake) bus = gst_pipeline_get_bus (pipeline); - fail_unless_equals_int (gst_element_set_state (GST_ELEMENT (pipeline), + fail_unless_equals_int (gst_element_set_state_async (GST_ELEMENT (pipeline), GST_STATE_PLAYING), GST_STATE_CHANGE_ASYNC); -#if 0 - /* FIXME: Wim is implementing a set_state_async, which will - * spawn a thread and make sure the pipeline gets to the - * requested final state, or errors out before */ - gst_bin_watch_for_state_change (GST_BIN (pipeline)); - - while ((type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1))) { + while (!done) { GstMessage *message; - GstMessageType type; GstState old, new; - GstState state, pending; - GstStateChange ret; - GTimeVal timeval; + GstMessageType type; + type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); message = gst_bus_pop (bus); gst_message_parse_state_changed (message, &old, &new); GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new); - g_print ("message\n"); - g_print ("%s: %d -> %d\n", GST_OBJECT_NAME (message->src), old, new); + if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING) + done = TRUE; gst_message_unref (message); - - timeval.tv_sec = 0; - timeval.tv_usec = 0; - ret = gst_element_get_state (GST_ELEMENT (pipeline), &state, &pending, - &timeval); } -#endif + g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL); + fail_unless_equals_int (gst_element_set_state (GST_ELEMENT (pipeline), + GST_STATE_NULL), GST_STATE_CHANGE_SUCCESS); + + gst_object_unref (bus); gst_object_unref (pipeline); } diff --git a/docs/gst/gstreamer-sections.txt b/docs/gst/gstreamer-sections.txt index e59aa3d..eca1e79 100644 --- a/docs/gst/gstreamer-sections.txt +++ b/docs/gst/gstreamer-sections.txt @@ -499,6 +499,7 @@ gst_element_set_locked_state gst_element_set_name gst_element_set_parent gst_element_set_state +gst_element_set_state_async gst_element_state_get_name gst_element_sync_state_with_parent gst_element_unlink diff --git a/gst/gstutils.c b/gst/gstutils.c index a43efaa..0383367 100644 --- a/gst/gstutils.c +++ b/gst/gstutils.c @@ -1888,6 +1888,92 @@ gst_bin_watch_for_state_change (GstBin * bin) } static void +set_state_async_thread_func (GstElement * element, gpointer statep) +{ + GstState state = GPOINTER_TO_INT (statep); + GstState current, pending; + GstStateChangeReturn ret = GST_STATE_CHANGE_ASYNC; + + GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, + "new thread ensuring state change to %s", + gst_element_state_get_name (state)); + + while (TRUE) { + /* wait indefinitely */ + ret = gst_element_get_state (element, ¤t, &pending, NULL); + GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, + "get_state returned %d, current %s, pending %s", ret, + gst_element_state_get_name (current), + gst_element_state_get_name (pending)); + + /* can only be SUCCESS or FAILURE */ + if (ret == GST_STATE_CHANGE_FAILURE) { + /* we can only break, hopefully an error message was posted as well */ + GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, + "FAILURE during state change"); + break; + } else if (ret == GST_STATE_CHANGE_SUCCESS) { + if (current == state) { + GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, + "successfully reached final state"); + break; + } + GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, + "setting target state %s again", gst_element_state_get_name (state)); + gst_element_set_state (element, state); + } else { + g_assert_not_reached (); + } + } + + GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, + "thread done waiting on state change"); + + gst_object_unref (element); +} + +/** + * gst_element_set_state_async: + * @element: a #GstElement to change state of + * @state: the element's new #GstState + * + * Sets the state of the element. This function will try to set the + * requested state by going through all the intermediary states and calling + * the class's state change function for each. If the state change returns + * #GST_STATE_CHANGE_ASYNC at any time, a thread will be started to + * monitor the state change and make sure the element is brought to the + * requested state. + * + * Returns: Result of the state change using #GstStateChangeReturn. + * + * MT safe. + */ +GstStateChangeReturn +gst_element_set_state_async (GstElement * element, GstState state) +{ + GstStateChangeReturn ret; + + ret = gst_element_set_state (element, state); + if (ret == GST_STATE_CHANGE_ASYNC) { + static GThreadPool *pool = NULL; + static GStaticMutex mutex = G_STATIC_MUTEX_INIT; + + GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, + "starting new thread to ensure state change to %s", + gst_element_state_get_name (state)); + g_static_mutex_lock (&mutex); + if (pool == NULL) + pool = g_thread_pool_new ((GFunc) set_state_async_thread_func, + GINT_TO_POINTER (state), -1, FALSE, NULL); + g_static_mutex_unlock (&mutex); + + g_thread_pool_push (pool, gst_object_ref (element), NULL); + } + + return ret; +} + +static void gst_element_populate_std_props (GObjectClass * klass, const gchar * prop_name, guint arg_id, GParamFlags flags) { diff --git a/gst/gstutils.h b/gst/gstutils.h index 4e2740b..2c25b48 100644 --- a/gst/gstutils.h +++ b/gst/gstutils.h @@ -30,14 +30,13 @@ G_BEGIN_DECLS void gst_util_set_value_from_string (GValue *value, const gchar *value_str); -void gst_util_set_object_arg (GObject *object, const gchar *name, const gchar *value); - -void gst_util_dump_mem (const guchar *mem, guint size); +void gst_util_set_object_arg (GObject *object, const gchar *name, const gchar *value); +void gst_util_dump_mem (const guchar *mem, guint size); guint64 gst_util_uint64_scale (guint64 val, guint64 num, guint64 denom); -void gst_print_pad_caps (GString *buf, gint indent, GstPad *pad); -void gst_print_element_args (GString *buf, gint indent, GstElement *element); +void gst_print_pad_caps (GString *buf, gint indent, GstPad *pad); +void gst_print_element_args (GString *buf, gint indent, GstElement *element); /* Macros for defining classes. Ideas taken from Bonobo, which took theirs @@ -98,53 +97,55 @@ type_as_function ## _get_type (void) \ * After this you will need to implement interface_as_function ## _supported * and interface_as_function ## _interface_init */ -#define GST_BOILERPLATE_WITH_INTERFACE(type, type_as_function, parent_type, \ - parent_type_as_macro, interface_type, interface_type_as_macro, \ - interface_as_function) \ - \ +#define GST_BOILERPLATE_WITH_INTERFACE(type, type_as_function, \ + parent_type, parent_type_as_macro, interface_type, \ + interface_type_as_macro, interface_as_function) \ + \ static void interface_as_function ## _interface_init (interface_type ## Class *klass); \ static gboolean interface_as_function ## _supported (type *object, GType iface_type); \ - \ -static void \ + \ +static void \ type_as_function ## _implements_interface_init (GstImplementsInterfaceClass *klass) \ -{ \ - klass->supported = (gpointer)interface_as_function ## _supported; \ -} \ - \ -static void \ -type_as_function ## _init_interfaces (GType type) \ -{ \ - static const GInterfaceInfo implements_iface_info = { \ - (GInterfaceInitFunc) type_as_function ## _implements_interface_init, \ - NULL, \ - NULL, \ - }; \ - static const GInterfaceInfo iface_info = { \ - (GInterfaceInitFunc) interface_as_function ## _interface_init, \ - NULL, \ - NULL, \ - }; \ - \ - g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE, \ - &implements_iface_info); \ - g_type_add_interface_static (type, interface_type_as_macro, &iface_info); \ -} \ - \ -GST_BOILERPLATE_FULL (type, type_as_function, parent_type, \ +{ \ + klass->supported = (gpointer)interface_as_function ## _supported; \ +} \ + \ +static void \ +type_as_function ## _init_interfaces (GType type) \ +{ \ + static const GInterfaceInfo implements_iface_info = { \ + (GInterfaceInitFunc) type_as_function ## _implements_interface_init,\ + NULL, \ + NULL, \ + }; \ + static const GInterfaceInfo iface_info = { \ + (GInterfaceInitFunc) interface_as_function ## _interface_init, \ + NULL, \ + NULL, \ + }; \ + \ + g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE, \ + &implements_iface_info); \ + g_type_add_interface_static (type, interface_type_as_macro, \ + &iface_info); \ +} \ + \ +GST_BOILERPLATE_FULL (type, type_as_function, parent_type, \ parent_type_as_macro, type_as_function ## _init_interfaces) /* Just call the parent handler. This assumes that there is a variable * named parent_class that points to the (duh!) parent class. Note that * this macro is not to be used with things that return something, use * the _WITH_DEFAULT version for that */ -#define GST_CALL_PARENT(parent_class_cast, name, args) \ - ((parent_class_cast(parent_class)->name != NULL) ? \ +#define GST_CALL_PARENT(parent_class_cast, name, args) \ + ((parent_class_cast(parent_class)->name != NULL) ? \ parent_class_cast(parent_class)->name args : (void) 0) /* Same as above, but in case there is no implementation, it evaluates * to def_return */ -#define GST_CALL_PARENT_WITH_DEFAULT(parent_class_cast, name, args, def_return) \ - ((parent_class_cast(parent_class)->name != NULL) ? \ +#define GST_CALL_PARENT_WITH_DEFAULT(parent_class_cast, name, args, \ + def_return) \ + ((parent_class_cast(parent_class)->name != NULL) ? \ parent_class_cast(parent_class)->name args : def_return) /* Define possibly unaligned memory access method whether the type of @@ -160,7 +161,7 @@ GST_BOILERPLATE_FULL (type, type_as_function, parent_type, #define GST_READ_UINT32_LE(data) _GST_GET (data, 32, LE) #define GST_READ_UINT16_BE(data) _GST_GET (data, 16, BE) #define GST_READ_UINT16_LE(data) _GST_GET (data, 16, LE) -#define GST_READ_UINT8(data) (* ((guint8 *) (data))) +#define GST_READ_UINT8(data) (* ((guint8 *) (data))) #define _GST_PUT(__data, __size, __end, __num) \ ((* (guint##__size *) (__data)) = GUINT##__size##_TO_##__end (__num)) @@ -171,7 +172,7 @@ GST_BOILERPLATE_FULL (type, type_as_function, parent_type, #define GST_WRITE_UINT32_LE(data, num) _GST_PUT(data, 32, LE, num) #define GST_WRITE_UINT16_BE(data, num) _GST_PUT(data, 16, BE, num) #define GST_WRITE_UINT16_LE(data, num) _GST_PUT(data, 16, LE, num) -#define GST_WRITE_UINT8(data, num) ((* (guint8 *) (data)) = (num)) +#define GST_WRITE_UINT8(data, num) ((* (guint8 *) (data)) = (num)) #else /* GST_HAVE_UNALIGNED_ACCESS */ @@ -278,13 +279,13 @@ GST_BOILERPLATE_FULL (type, type_as_function, parent_type, #define GST_ROUND_UP_32(num) (((num)+31)&~31) #define GST_ROUND_UP_64(num) (((num)+63)&~63) -void gst_object_default_error (GstObject * source, +void gst_object_default_error (GstObject * source, GError * error, gchar * debug); /* element functions */ -GstFlowReturn gst_element_abort_preroll (GstElement *element); -GstFlowReturn gst_element_finish_preroll (GstElement *element, GstPad *pad); +GstFlowReturn gst_element_abort_preroll (GstElement *element); +GstFlowReturn gst_element_finish_preroll (GstElement *element, GstPad *pad); void gst_element_create_all_pads (GstElement *element); GstPad* gst_element_get_compatible_pad (GstElement *element, GstPad *pad, @@ -312,6 +313,7 @@ void gst_element_unlink_pads (GstElement *src, const gboolean gst_element_link_pads_filtered (GstElement * src, const gchar * srcpadname, GstElement * dest, const gchar * destpadname, GstCaps *filter); +GstStateChangeReturn gst_element_set_state_async (GstElement * element, GstState state); /* util elementfactory functions */ gboolean gst_element_factory_can_src_caps(GstElementFactory *factory, const GstCaps *caps); @@ -324,8 +326,8 @@ gboolean gst_element_query_convert (GstElement *element, Gs GstFormat *dest_fmt, gint64 *dest_val); /* element class functions */ -void gst_element_class_install_std_props (GstElementClass * klass, - const gchar * first_name, ...); +void gst_element_class_install_std_props (GstElementClass * klass, + const gchar * first_name, ...); /* pad functions */ gboolean gst_pad_can_link (GstPad *srcpad, GstPad *sinkpad); @@ -338,7 +340,7 @@ gboolean gst_pad_proxy_setcaps (GstPad * pad, GstCaps * caps); GstElement* gst_pad_get_parent_element (GstPad *pad); /* flow */ -G_CONST_RETURN gchar* gst_flow_get_name (GstFlowReturn ret); +G_CONST_RETURN gchar* gst_flow_get_name (GstFlowReturn ret); /* util query functions */ @@ -348,8 +350,8 @@ gboolean gst_pad_query_convert (GstPad *pad, GstFormat GstFormat *dest_format, gint64 *dest_val); /* bin functions */ -void gst_bin_add_many (GstBin *bin, GstElement *element_1, ...); -void gst_bin_remove_many (GstBin *bin, GstElement *element_1, ...); +void gst_bin_add_many (GstBin *bin, GstElement *element_1, ...); +void gst_bin_remove_many (GstBin *bin, GstElement *element_1, ...); void gst_bin_watch_for_state_change (GstBin *bin); /* buffer functions */ diff --git a/tests/check/gst/gstpipeline.c b/tests/check/gst/gstpipeline.c index 4f7dd86..698cabf 100644 --- a/tests/check/gst/gstpipeline.c +++ b/tests/check/gst/gstpipeline.c @@ -21,26 +21,6 @@ #include -#if 0 -static void -pop_messages (GstBus * bus, int count) -{ - GstMessage *message; - - int i; - - GST_DEBUG ("popping %d messages", count); - for (i = 0; i < count; ++i) { - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1) - == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - - message = gst_bus_pop (bus); - gst_message_unref (message); - } - GST_DEBUG ("popped %d messages", count); -} -#endif - /* an empty pipeline can go to PLAYING in one go */ GST_START_TEST (test_async_state_change_empty) { @@ -87,6 +67,7 @@ GST_START_TEST (test_async_state_change_fake) GstPipeline *pipeline; GstElement *src, *sink; GstBus *bus; + gboolean done = FALSE; pipeline = GST_PIPELINE (gst_pipeline_new (NULL)); fail_unless (pipeline != NULL, "Could not create pipeline"); @@ -100,37 +81,28 @@ GST_START_TEST (test_async_state_change_fake) bus = gst_pipeline_get_bus (pipeline); - fail_unless_equals_int (gst_element_set_state (GST_ELEMENT (pipeline), + fail_unless_equals_int (gst_element_set_state_async (GST_ELEMENT (pipeline), GST_STATE_PLAYING), GST_STATE_CHANGE_ASYNC); -#if 0 - /* FIXME: Wim is implementing a set_state_async, which will - * spawn a thread and make sure the pipeline gets to the - * requested final state, or errors out before */ - gst_bin_watch_for_state_change (GST_BIN (pipeline)); - - while ((type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1))) { + while (!done) { GstMessage *message; - GstMessageType type; GstState old, new; - GstState state, pending; - GstStateChange ret; - GTimeVal timeval; + GstMessageType type; + type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); message = gst_bus_pop (bus); gst_message_parse_state_changed (message, &old, &new); GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new); - g_print ("message\n"); - g_print ("%s: %d -> %d\n", GST_OBJECT_NAME (message->src), old, new); + if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING) + done = TRUE; gst_message_unref (message); - - timeval.tv_sec = 0; - timeval.tv_usec = 0; - ret = gst_element_get_state (GST_ELEMENT (pipeline), &state, &pending, - &timeval); } -#endif + g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL); + fail_unless_equals_int (gst_element_set_state (GST_ELEMENT (pipeline), + GST_STATE_NULL), GST_STATE_CHANGE_SUCCESS); + + gst_object_unref (bus); gst_object_unref (pipeline); } -- 2.7.4