From 1c1af875d490f06d290ccdfdc8a331526044e452 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 19 Sep 2005 11:18:03 +0000 Subject: [PATCH] GstBusHandler -> GstBusFunc, return value has the same meaning as any other GSource (FALSE == remove source). Original commit message from CVS: * check/gst/gstbin.c: (pop_messages), (GST_START_TEST): * check/gst/gstbus.c: (message_func_eos), (message_func_app), (send_messages), (GST_START_TEST), (gstbus_suite): * check/gst/gstpipeline.c: (GST_START_TEST): * check/pipelines/cleanup.c: (run_pipeline): * check/pipelines/simple_launch_lines.c: (run_pipeline), (GST_START_TEST): * gst/gstbus.c: (gst_bus_have_pending), (gst_bus_source_prepare), (gst_bus_source_check), (gst_bus_source_dispatch), (gst_bus_create_watch), (gst_bus_add_watch_full), (gst_bus_add_watch), (poll_func), (poll_timeout), (gst_bus_poll): * gst/gstbus.h: * tools/gst-launch.c: (event_loop): * tools/gst-md5sum.c: (event_loop): GstBusHandler -> GstBusFunc, return value has the same meaning as any other GSource (FALSE == remove source). _add_watch() and _add_watch_full() now take a MessageType mask to only handle specific types of messages. _poll() returns the GstMessage instead of the message type to avoid race conditions. _have_pending() takes a MessageType mask now too. Added testsuite for multiple bus watches. Fix testsuites and applications for new bus API. --- ChangeLog | 26 +++++ check/gst/gstbin.c | 27 ++--- check/gst/gstbus.c | 86 ++++++++++++++- check/gst/gstpipeline.c | 18 +-- check/pipelines/cleanup.c | 11 +- check/pipelines/simple_launch_lines.c | 25 +++-- gst/gstbus.c | 165 +++++++++++++++------------- gst/gstbus.h | 34 +++--- tests/check/gst/gstbin.c | 27 ++--- tests/check/gst/gstbus.c | 86 ++++++++++++++- tests/check/gst/gstpipeline.c | 18 +-- tests/check/pipelines/cleanup.c | 11 +- tests/check/pipelines/simple-launch-lines.c | 25 +++-- tools/gst-launch.c | 10 +- tools/gst-md5sum.c | 6 +- 15 files changed, 397 insertions(+), 178 deletions(-) diff --git a/ChangeLog b/ChangeLog index ec11518..1cb2c0b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,29 @@ +2005-09-19 Wim Taymans + + * check/gst/gstbin.c: (pop_messages), (GST_START_TEST): + * check/gst/gstbus.c: (message_func_eos), (message_func_app), + (send_messages), (GST_START_TEST), (gstbus_suite): + * check/gst/gstpipeline.c: (GST_START_TEST): + * check/pipelines/cleanup.c: (run_pipeline): + * check/pipelines/simple_launch_lines.c: (run_pipeline), + (GST_START_TEST): + * gst/gstbus.c: (gst_bus_have_pending), (gst_bus_source_prepare), + (gst_bus_source_check), (gst_bus_source_dispatch), + (gst_bus_create_watch), (gst_bus_add_watch_full), + (gst_bus_add_watch), (poll_func), (poll_timeout), (gst_bus_poll): + * gst/gstbus.h: + * tools/gst-launch.c: (event_loop): + * tools/gst-md5sum.c: (event_loop): + GstBusHandler -> GstBusFunc, return value has the same meaning as + any other GSource (FALSE == remove source). + _add_watch() and _add_watch_full() now take a MessageType mask to + only handle specific types of messages. + _poll() returns the GstMessage instead of the message type to avoid + race conditions. + _have_pending() takes a MessageType mask now too. + Added testsuite for multiple bus watches. + Fix testsuites and applications for new bus API. + 2005-09-19 Thomas Vander Stichele * check/Makefile.am: diff --git a/check/gst/gstbin.c b/check/gst/gstbin.c index fa704be..16c6f62 100644 --- a/check/gst/gstbin.c +++ b/check/gst/gstbin.c @@ -31,10 +31,11 @@ pop_messages (GstBus * bus, int count) GST_DEBUG ("popping %d messages", count); for (i = 0; i < count; ++i) { - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1) + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + + fail_unless (message && GST_MESSAGE_TYPE (message) == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - message = gst_bus_pop (bus); gst_message_unref (message); } GST_DEBUG ("popped %d messages", count); @@ -121,11 +122,11 @@ GST_START_TEST (test_message_state_changed) ASSERT_OBJECT_REFCOUNT (bin, "bin", 2); /* get and unref the message, causing a decref on the bin */ - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, - -1) == GST_MESSAGE_STATE_CHANGED, - "did not get GST_MESSAGE_STATE_CHANGED"); + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + + fail_unless (message && GST_MESSAGE_TYPE (message) + == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - message = gst_bus_pop (bus); gst_message_unref (message); ASSERT_OBJECT_REFCOUNT (bin, "bin", 1); @@ -166,10 +167,10 @@ GST_START_TEST (test_message_state_changed_child) ASSERT_OBJECT_REFCOUNT (bin, "bin", 2); /* get and unref the message, causing a decref on the src */ - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1) + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + fail_unless (message && GST_MESSAGE_TYPE (message) == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - message = gst_bus_pop (bus); fail_unless (message->src == GST_OBJECT (src)); gst_message_unref (message); @@ -177,10 +178,10 @@ GST_START_TEST (test_message_state_changed_child) ASSERT_OBJECT_REFCOUNT (bin, "bin", 2); /* get and unref message 2, causing a decref on the bin */ - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1) + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + fail_unless (message && GST_MESSAGE_TYPE (message) == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - message = gst_bus_pop (bus); fail_unless (message->src == GST_OBJECT (bin)); gst_message_unref (message); @@ -341,7 +342,7 @@ GST_START_TEST (test_watch_for_state_change) pop_messages (bus, 5); - fail_unless (gst_bus_have_pending (bus) == FALSE, + fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE, "Unexpected messages on bus"); gst_bin_watch_for_state_change (GST_BIN (bin)); @@ -349,7 +350,7 @@ GST_START_TEST (test_watch_for_state_change) /* should get the bin's state change message now */ pop_messages (bus, 1); - fail_unless (gst_bus_have_pending (bus) == FALSE, + fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE, "Unexpected messages on bus"); fail_unless (gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING) @@ -364,7 +365,7 @@ GST_START_TEST (test_watch_for_state_change) pop_messages (bus, 3); - fail_unless (gst_bus_have_pending (bus) == FALSE, + fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE, "Unexpected messages on bus"); /* setting bin to NULL flushes the bus automatically */ diff --git a/check/gst/gstbus.c b/check/gst/gstbus.c index 54ea722..8fb3609 100644 --- a/check/gst/gstbus.c +++ b/check/gst/gstbus.c @@ -19,11 +19,10 @@ * Boston, MA 02111-1307, USA. */ - #include - static GstBus *test_bus = NULL; +static GMainLoop *main_loop; #define NUM_MESSAGES 1000 #define NUM_THREADS 10 @@ -99,8 +98,86 @@ GST_START_TEST (test_hammer_bus) gst_object_unref ((GstObject *) test_bus); } -GST_END_TEST Suite * -gstbus_suite (void) +GST_END_TEST static gboolean +message_func_eos (GstBus * bus, GstMessage * message, gpointer data) +{ + const GstStructure *s; + gint i; + + g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE); + + GST_DEBUG ("got EOS message"); + + s = gst_message_get_structure (message); + if (!gst_structure_get_int (s, "msg_id", &i)) + g_critical ("Invalid message"); + + return i != 9; +} + +static gboolean +message_func_app (GstBus * bus, GstMessage * message, gpointer data) +{ + const GstStructure *s; + gint i; + + g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION, + FALSE); + + GST_DEBUG ("got APP message"); + + s = gst_message_get_structure (message); + if (!gst_structure_get_int (s, "msg_id", &i)) + g_critical ("Invalid message"); + + return i != 9; +} + +static gboolean +send_messages (gpointer data) +{ + GstMessage *m; + GstStructure *s; + gint i; + + for (i = 0; i < 10; i++) { + s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL); + m = gst_message_new_application (NULL, s); + gst_bus_post (test_bus, m); + s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL); + m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s); + gst_bus_post (test_bus, m); + } + + return FALSE; +} + +/* test id adding two watches for different message types calls the + * respective callbacks. */ +GST_START_TEST (test_watch) +{ + guint id1, id2; + + test_bus = gst_bus_new (); + + main_loop = g_main_loop_new (NULL, FALSE); + + id2 = gst_bus_add_watch (test_bus, GST_MESSAGE_EOS, message_func_eos, NULL); + id1 = + gst_bus_add_watch (test_bus, GST_MESSAGE_APPLICATION, message_func_app, + NULL); + + g_idle_add ((GSourceFunc) send_messages, NULL); + while (g_main_context_pending (NULL)) + g_main_context_iteration (NULL, FALSE); + + g_source_remove (id1); + g_source_remove (id2); + g_main_loop_unref (main_loop); + + gst_object_unref ((GstObject *) test_bus); +} +GST_END_TEST Suite * gstbus_suite (void) { Suite *s = suite_create ("GstBus"); TCase *tc_chain = tcase_create ("stresstest"); @@ -109,6 +186,7 @@ gstbus_suite (void) suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_hammer_bus); + tcase_add_test (tc_chain, test_watch); return s; } diff --git a/check/gst/gstpipeline.c b/check/gst/gstpipeline.c index 698cabf..ab68db1 100644 --- a/check/gst/gstpipeline.c +++ b/check/gst/gstpipeline.c @@ -87,15 +87,15 @@ GST_START_TEST (test_async_state_change_fake) while (!done) { GstMessage *message; GstState old, new; - GstMessageType type; - - type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); - message = gst_bus_pop (bus); - gst_message_parse_state_changed (message, &old, &new); - GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new); - if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING) - done = TRUE; - gst_message_unref (message); + + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + if (message) { + gst_message_parse_state_changed (message, &old, &new); + GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new); + if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING) + done = TRUE; + gst_message_unref (message); + } } g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL); diff --git a/check/pipelines/cleanup.c b/check/pipelines/cleanup.c index 6872447..5024db4 100644 --- a/check/pipelines/cleanup.c +++ b/check/pipelines/cleanup.c @@ -48,11 +48,14 @@ run_pipeline (GstElement * pipe, gchar * descr, gst_element_set_state (pipe, GST_STATE_PLAYING); while (1) { - revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2); + GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2); - /* always have to pop the message before getting back into poll */ - if (revent != GST_MESSAGE_UNKNOWN) - gst_message_unref (gst_bus_pop (bus)); + if (message) { + revent = GST_MESSAGE_TYPE (message); + gst_message_unref (message); + } else { + revent = GST_MESSAGE_UNKNOWN; + } if (revent == tevent) { break; diff --git a/check/pipelines/simple_launch_lines.c b/check/pipelines/simple_launch_lines.c index 293e887..c40b88f 100644 --- a/check/pipelines/simple_launch_lines.c +++ b/check/pipelines/simple_launch_lines.c @@ -59,11 +59,14 @@ run_pipeline (GstElement * pipe, gchar * descr, } while (1) { - revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2); + GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2); - /* always have to pop the message before getting back into poll */ - if (revent != GST_MESSAGE_UNKNOWN) - gst_message_unref (gst_bus_pop (bus)); + if (message) { + revent = GST_MESSAGE_TYPE (message); + gst_message_unref (message); + } else { + revent = GST_MESSAGE_UNKNOWN; + } if (revent == tevent) { break; @@ -138,6 +141,7 @@ GST_START_TEST (test_stop_from_app) GstElement *fakesrc, *fakesink, *pipeline; GstBus *bus; GstMessageType revent; + GstMessage *message; assert_live_count (GST_TYPE_BUFFER, 0); @@ -159,17 +163,22 @@ GST_START_TEST (test_stop_from_app) g_assert (bus); /* will time out after half a second */ - revent = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2); - + message = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2); + if (message) { + revent = GST_MESSAGE_TYPE (message); + gst_message_unref (message); + } else { + revent = GST_MESSAGE_UNKNOWN; + } g_return_if_fail (revent == GST_MESSAGE_APPLICATION); - gst_message_unref (gst_bus_pop (bus)); gst_element_set_state (pipeline, GST_STATE_NULL); gst_object_unref (pipeline); assert_live_count (GST_TYPE_BUFFER, 0); } -GST_END_TEST Suite * simple_launch_lines_suite (void) +GST_END_TEST Suite * +simple_launch_lines_suite (void) { Suite *s = suite_create ("Pipelines"); TCase *tc_chain = tcase_create ("linear"); diff --git a/gst/gstbus.c b/gst/gstbus.c index d3176e1..38fafc1 100644 --- a/gst/gstbus.c +++ b/gst/gstbus.c @@ -42,8 +42,8 @@ * up to the specified timeout value until one of the specified messages types * is posted on the bus. The application can then _pop() the messages from the * bus to handle them. - * Alternatively the application can register an asynchronous bus handler using - * gst_bus_add_watch_full() orgst_bus_add_watch(). This handler will receive + * Alternatively the application can register an asynchronous bus function using + * gst_bus_add_watch_full() orgst_bus_add_watch(). This function will receive * messages a short while after they have been posted. * * It is also possible to get messages from the bus without any thread @@ -52,12 +52,7 @@ * message on the bus. This should only be used if the application is able * to deal with messages from different threads. * - * It is important to make sure that every message is popped from the bus at - * some point in time. Otherwise it will be presented to the watches (#GSource - * elements) again and again. One way to implement it is having one watch with a - * low priority (see gst_add_watch_full()) that pops all messages. - * - * Every #GstPipeline has one bus. + * Every #GstBin has one bus. */ #include @@ -324,26 +319,35 @@ is_flushing: /** * gst_bus_have_pending: * @bus: a #GstBus to check + * @events: a mask of #GstMessageType, representing the set of message types to + * watch for. * - * Check if there are pending messages on the bus that should be - * handled. + * Check if there are pending messages on the bus of the given types that + * should be handled. * * Returns: TRUE if there are messages on the bus to be handled. * * MT safe. */ gboolean -gst_bus_have_pending (GstBus * bus) +gst_bus_have_pending (GstBus * bus, GstMessageType events) { - gint length; + GstMessage *message; + gboolean result; g_return_val_if_fail (GST_IS_BUS (bus), FALSE); g_mutex_lock (bus->queue_lock); - length = g_queue_get_length (bus->queue); + /* see if there is a message on the bus that satisfies the + * event mask */ + message = g_queue_peek_head (bus->queue); + if (message) + result = (GST_MESSAGE_TYPE (message) & events) != 0; + else + result = FALSE; g_mutex_unlock (bus->queue_lock); - return (length > 0); + return result; } /** @@ -470,29 +474,34 @@ typedef struct { GSource source; GstBus *bus; + GstMessageType events; } GstBusSource; static gboolean gst_bus_source_prepare (GSource * source, gint * timeout) { + GstBusSource *bsrc = (GstBusSource *) source; + *timeout = -1; - return gst_bus_have_pending (((GstBusSource *) source)->bus); + return gst_bus_have_pending (bsrc->bus, bsrc->events); } static gboolean gst_bus_source_check (GSource * source) { - return gst_bus_have_pending (((GstBusSource *) source)->bus); + GstBusSource *bsrc = (GstBusSource *) source; + + return gst_bus_have_pending (bsrc->bus, bsrc->events); } static gboolean gst_bus_source_dispatch (GSource * source, GSourceFunc callback, gpointer user_data) { - GstBusHandler handler = (GstBusHandler) callback; + GstBusFunc handler = (GstBusFunc) callback; GstBusSource *bsource = (GstBusSource *) source; GstMessage *message; - gboolean needs_pop = TRUE; + gboolean keep; GstBus *bus; g_return_val_if_fail (bsource != NULL, FALSE); @@ -501,34 +510,20 @@ gst_bus_source_dispatch (GSource * source, GSourceFunc callback, g_return_val_if_fail (GST_IS_BUS (bus), FALSE); - message = gst_bus_peek (bus); - - GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message); - - g_return_val_if_fail (message != NULL, TRUE); + message = gst_bus_pop (bus); + g_return_val_if_fail (message != NULL, FALSE); if (!handler) goto no_handler; GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message); - needs_pop = handler (bus, message, user_data); + keep = handler (bus, message, user_data); gst_message_unref (message); - GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop); - if (needs_pop) { - message = gst_bus_pop (bus); - if (message) { - gst_message_unref (message); - } else { - /* after executing the handler, the app could have disposed - * the pipeline and set the bus to flushing. It is possible - * then that there are no more messages on the bus. this is - * not a problem. */ - GST_DEBUG ("handler requested pop but no message on the bus"); - } - } - return TRUE; + GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep); + + return keep; no_handler: { @@ -558,13 +553,19 @@ static GSourceFuncs gst_bus_source_funcs = { /** * gst_bus_create_watch: * @bus: a #GstBus to create the watch for + * @events: a mask of #GstMessageType, representing the set of message types to + * watch for. * - * Create watch for this bus. + * Create watch for this bus. The source will only act on messages of the + * given types, messages of other types will simply remain on the bus and + * this GSource will not be dispatched again before the message is popped off + * the bus. For this reason one typically has a low priority GSource that + * pops all remaining messages from the bus not handled by the other GSources. * * Returns: A #GSource that can be added to a mainloop. */ GSource * -gst_bus_create_watch (GstBus * bus) +gst_bus_create_watch (GstBus * bus, GstMessageType events) { GstBusSource *source; @@ -574,6 +575,7 @@ gst_bus_create_watch (GstBus * bus) sizeof (GstBusSource)); gst_object_ref (bus); source->bus = bus; + source->events = events; return (GSource *) source; } @@ -582,34 +584,37 @@ gst_bus_create_watch (GstBus * bus) * gst_bus_add_watch_full: * @bus: a #GstBus to create the watch for. * @priority: The priority of the watch. - * @handler: A function to call when a message is received. - * @user_data: user data passed to @handler. + * @events: a mask of #GstMessageType, representing the set of message types to + * watch for. + * @func: A function to call when a message is received. + * @user_data: user data passed to @func. * @notify: the function to call when the source is removed. * - * Adds the bus to the mainloop with the given priority. If the handler returns - * TRUE, the message will then be popped off the queue. When the handler is - * called, the message belongs to the caller; if you want to keep a copy of it, - * call gst_message_ref before leaving the handler. + * Adds the bus to the mainloop with the given priority. If the func returns + * FALSE, the func will be removed. + * + * When the func is called, the message belongs to the caller; if you want to + * keep a copy of it, call gst_message_ref before leaving the func. * * Returns: The event source id. * * MT safe. */ guint -gst_bus_add_watch_full (GstBus * bus, gint priority, - GstBusHandler handler, gpointer user_data, GDestroyNotify notify) +gst_bus_add_watch_full (GstBus * bus, gint priority, GstMessageType events, + GstBusFunc func, gpointer user_data, GDestroyNotify notify) { guint id; GSource *source; g_return_val_if_fail (GST_IS_BUS (bus), 0); - source = gst_bus_create_watch (bus); + source = gst_bus_create_watch (bus, events); if (priority != G_PRIORITY_DEFAULT) g_source_set_priority (source, priority); - g_source_set_callback (source, (GSourceFunc) handler, user_data, notify); + g_source_set_callback (source, (GSourceFunc) func, user_data, notify); id = g_source_attach (source, NULL); g_source_unref (source); @@ -621,8 +626,10 @@ gst_bus_add_watch_full (GstBus * bus, gint priority, /** * gst_bus_add_watch: * @bus: a #GstBus to create the watch for - * @handler: A function to call when a message is received. - * @user_data: user data passed to @handler. + * @events: a mask of #GstMessageType, representing the set of message types to + * watch for. + * @func: A function to call when a message is received. + * @user_data: user data passed to @func. * * Adds the bus to the mainloop with the default priority. * @@ -631,10 +638,11 @@ gst_bus_add_watch_full (GstBus * bus, gint priority, * MT safe. */ guint -gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data) +gst_bus_add_watch (GstBus * bus, GstMessageType events, GstBusFunc func, + gpointer user_data) { - return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data, - NULL); + return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, events, func, + user_data, NULL); } typedef struct @@ -643,25 +651,26 @@ typedef struct guint timeout_id; gboolean source_running; GstMessageType events; - GstMessageType revent; + GstMessage *message; } GstBusPollData; static gboolean -poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data) +poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data) { if (!g_main_loop_is_running (poll_data->loop)) - return FALSE; + return TRUE; if (GST_MESSAGE_TYPE (message) & poll_data->events) { - poll_data->revent = GST_MESSAGE_TYPE (message); + g_return_val_if_fail (poll_data->message == NULL, FALSE); + /* keep ref to message */ + poll_data->message = gst_message_ref (message); g_main_loop_quit (poll_data->loop); - - /* keep the message on the queue */ - return FALSE; } else { - /* pop and unref the message */ - return TRUE; + /* don't remove the source. */ } + /* we always keep the source alive so that we don't accidentialy + * free the poll_data */ + return TRUE; } static gboolean @@ -669,8 +678,9 @@ poll_timeout (GstBusPollData * poll_data) { g_main_loop_quit (poll_data->loop); - /* returning FALSE will remove the source id */ - return FALSE; + /* we don't remove the GSource as this would free our poll_data, + * which we still need */ + return TRUE; } static void @@ -706,24 +716,21 @@ poll_destroy_timeout (GstBusPollData * poll_data) * * This function will enter the default mainloop while polling. * - * Returns: The type of the message that was received, or GST_MESSAGE_UNKNOWN if - * the poll timed out. The message will remain in the bus queue; you will need - * to gst_bus_pop() it off before entering gst_bus_poll() again. + * Returns: The message that was received, or NULL if the poll timed out. + * The message is taken from the bus and needs to be unreffed after usage. */ -GstMessageType +GstMessage * gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout) { GstBusPollData *poll_data; - GstMessageType ret; + GstMessage *ret; guint id; poll_data = g_new0 (GstBusPollData, 1); - g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN); - poll_data->source_running = TRUE; poll_data->loop = g_main_loop_new (NULL, FALSE); poll_data->events = events; - poll_data->revent = GST_MESSAGE_UNKNOWN; + poll_data->message = NULL; if (timeout >= 0) poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE, @@ -732,10 +739,12 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout) else poll_data->timeout_id = 0; - id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE, - (GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy); + id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE, GST_MESSAGE_ANY, + (GstBusFunc) poll_func, poll_data, (GDestroyNotify) poll_destroy); + g_main_loop_run (poll_data->loop); - ret = poll_data->revent; + /* holds a ref */ + ret = poll_data->message; if (poll_data->timeout_id) g_source_remove (poll_data->timeout_id); @@ -743,7 +752,7 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout) /* poll_data may get destroyed at any time now */ g_source_remove (id); - GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret); + GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret); return ret; } diff --git a/gst/gstbus.h b/gst/gstbus.h index 50c6877..ff7625e 100644 --- a/gst/gstbus.h +++ b/gst/gstbus.h @@ -59,24 +59,29 @@ typedef enum * @data: user data that has been given, when registering the handler * * Handler will be invoked synchronously, when a new message has been injected - * into the bus. + * into the bus. This function is mostly used internally. Only one sync handler + * can be attached to a given bus. * * Returns: #GstBusSyncReply stating what to do with the message */ -typedef GstBusSyncReply (*GstBusSyncHandler) (GstBus * bus, GstMessage * message, gpointer data); +typedef GstBusSyncReply (*GstBusSyncHandler) (GstBus * bus, GstMessage * message, gpointer data); + /** - * GstBusHandler: + * GstBusFunc: * @bus: the #GstBus that sent the message * @message: the #GstMessage * @data: user data that has been given, when registering the handler * - * Handler will be invoked asynchronously, after a new message has been injected - * into the bus. Return %TRUE if the message has been handled. It will then be - * taken from the bus and _unref()'ed. + * Specifies the type of function passed to #gst_bus_add_watch() or + * #gst_bus_add_watch_full(), which is called from the mainloop when a message + * is available on the bus. + * + * The message passed to the function will be unreffed after execution of this + * function so it should not be freed in the function. * - * Returns: %TRUE if message should be taken from the bus + * Returns: %FALSE if the event source should be removed. */ -typedef gboolean (*GstBusHandler) (GstBus * bus, GstMessage * message, gpointer data); +typedef gboolean (*GstBusFunc) (GstBus * bus, GstMessage * message, gpointer data); struct _GstBus { @@ -107,7 +112,7 @@ GstBus* gst_bus_new (void); gboolean gst_bus_post (GstBus * bus, GstMessage * message); -gboolean gst_bus_have_pending (GstBus * bus); +gboolean gst_bus_have_pending (GstBus * bus, GstMessageType events); GstMessage * gst_bus_peek (GstBus * bus); GstMessage * gst_bus_pop (GstBus * bus); void gst_bus_set_flushing (GstBus * bus, gboolean flushing); @@ -115,16 +120,19 @@ void gst_bus_set_flushing (GstBus * bus, gboolean flushing); void gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data); -GSource * gst_bus_create_watch (GstBus * bus); +GSource * gst_bus_create_watch (GstBus * bus, GstMessageType events); guint gst_bus_add_watch_full (GstBus * bus, gint priority, - GstBusHandler handler, + GstMessageType events, + GstBusFunc func, gpointer user_data, GDestroyNotify notify); guint gst_bus_add_watch (GstBus * bus, - GstBusHandler handler, + GstMessageType events, + GstBusFunc func, gpointer user_data); -GstMessageType gst_bus_poll (GstBus *bus, GstMessageType events, + +GstMessage* gst_bus_poll (GstBus *bus, GstMessageType events, GstClockTimeDiff timeout); G_END_DECLS diff --git a/tests/check/gst/gstbin.c b/tests/check/gst/gstbin.c index fa704be..16c6f62 100644 --- a/tests/check/gst/gstbin.c +++ b/tests/check/gst/gstbin.c @@ -31,10 +31,11 @@ pop_messages (GstBus * bus, int count) GST_DEBUG ("popping %d messages", count); for (i = 0; i < count; ++i) { - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1) + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + + fail_unless (message && GST_MESSAGE_TYPE (message) == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - message = gst_bus_pop (bus); gst_message_unref (message); } GST_DEBUG ("popped %d messages", count); @@ -121,11 +122,11 @@ GST_START_TEST (test_message_state_changed) ASSERT_OBJECT_REFCOUNT (bin, "bin", 2); /* get and unref the message, causing a decref on the bin */ - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, - -1) == GST_MESSAGE_STATE_CHANGED, - "did not get GST_MESSAGE_STATE_CHANGED"); + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + + fail_unless (message && GST_MESSAGE_TYPE (message) + == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - message = gst_bus_pop (bus); gst_message_unref (message); ASSERT_OBJECT_REFCOUNT (bin, "bin", 1); @@ -166,10 +167,10 @@ GST_START_TEST (test_message_state_changed_child) ASSERT_OBJECT_REFCOUNT (bin, "bin", 2); /* get and unref the message, causing a decref on the src */ - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1) + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + fail_unless (message && GST_MESSAGE_TYPE (message) == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - message = gst_bus_pop (bus); fail_unless (message->src == GST_OBJECT (src)); gst_message_unref (message); @@ -177,10 +178,10 @@ GST_START_TEST (test_message_state_changed_child) ASSERT_OBJECT_REFCOUNT (bin, "bin", 2); /* get and unref message 2, causing a decref on the bin */ - fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1) + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + fail_unless (message && GST_MESSAGE_TYPE (message) == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); - message = gst_bus_pop (bus); fail_unless (message->src == GST_OBJECT (bin)); gst_message_unref (message); @@ -341,7 +342,7 @@ GST_START_TEST (test_watch_for_state_change) pop_messages (bus, 5); - fail_unless (gst_bus_have_pending (bus) == FALSE, + fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE, "Unexpected messages on bus"); gst_bin_watch_for_state_change (GST_BIN (bin)); @@ -349,7 +350,7 @@ GST_START_TEST (test_watch_for_state_change) /* should get the bin's state change message now */ pop_messages (bus, 1); - fail_unless (gst_bus_have_pending (bus) == FALSE, + fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE, "Unexpected messages on bus"); fail_unless (gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING) @@ -364,7 +365,7 @@ GST_START_TEST (test_watch_for_state_change) pop_messages (bus, 3); - fail_unless (gst_bus_have_pending (bus) == FALSE, + fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE, "Unexpected messages on bus"); /* setting bin to NULL flushes the bus automatically */ diff --git a/tests/check/gst/gstbus.c b/tests/check/gst/gstbus.c index 54ea722..8fb3609 100644 --- a/tests/check/gst/gstbus.c +++ b/tests/check/gst/gstbus.c @@ -19,11 +19,10 @@ * Boston, MA 02111-1307, USA. */ - #include - static GstBus *test_bus = NULL; +static GMainLoop *main_loop; #define NUM_MESSAGES 1000 #define NUM_THREADS 10 @@ -99,8 +98,86 @@ GST_START_TEST (test_hammer_bus) gst_object_unref ((GstObject *) test_bus); } -GST_END_TEST Suite * -gstbus_suite (void) +GST_END_TEST static gboolean +message_func_eos (GstBus * bus, GstMessage * message, gpointer data) +{ + const GstStructure *s; + gint i; + + g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE); + + GST_DEBUG ("got EOS message"); + + s = gst_message_get_structure (message); + if (!gst_structure_get_int (s, "msg_id", &i)) + g_critical ("Invalid message"); + + return i != 9; +} + +static gboolean +message_func_app (GstBus * bus, GstMessage * message, gpointer data) +{ + const GstStructure *s; + gint i; + + g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION, + FALSE); + + GST_DEBUG ("got APP message"); + + s = gst_message_get_structure (message); + if (!gst_structure_get_int (s, "msg_id", &i)) + g_critical ("Invalid message"); + + return i != 9; +} + +static gboolean +send_messages (gpointer data) +{ + GstMessage *m; + GstStructure *s; + gint i; + + for (i = 0; i < 10; i++) { + s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL); + m = gst_message_new_application (NULL, s); + gst_bus_post (test_bus, m); + s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL); + m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s); + gst_bus_post (test_bus, m); + } + + return FALSE; +} + +/* test id adding two watches for different message types calls the + * respective callbacks. */ +GST_START_TEST (test_watch) +{ + guint id1, id2; + + test_bus = gst_bus_new (); + + main_loop = g_main_loop_new (NULL, FALSE); + + id2 = gst_bus_add_watch (test_bus, GST_MESSAGE_EOS, message_func_eos, NULL); + id1 = + gst_bus_add_watch (test_bus, GST_MESSAGE_APPLICATION, message_func_app, + NULL); + + g_idle_add ((GSourceFunc) send_messages, NULL); + while (g_main_context_pending (NULL)) + g_main_context_iteration (NULL, FALSE); + + g_source_remove (id1); + g_source_remove (id2); + g_main_loop_unref (main_loop); + + gst_object_unref ((GstObject *) test_bus); +} +GST_END_TEST Suite * gstbus_suite (void) { Suite *s = suite_create ("GstBus"); TCase *tc_chain = tcase_create ("stresstest"); @@ -109,6 +186,7 @@ gstbus_suite (void) suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_hammer_bus); + tcase_add_test (tc_chain, test_watch); return s; } diff --git a/tests/check/gst/gstpipeline.c b/tests/check/gst/gstpipeline.c index 698cabf..ab68db1 100644 --- a/tests/check/gst/gstpipeline.c +++ b/tests/check/gst/gstpipeline.c @@ -87,15 +87,15 @@ GST_START_TEST (test_async_state_change_fake) while (!done) { GstMessage *message; GstState old, new; - GstMessageType type; - - type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); - message = gst_bus_pop (bus); - gst_message_parse_state_changed (message, &old, &new); - GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new); - if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING) - done = TRUE; - gst_message_unref (message); + + message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); + if (message) { + gst_message_parse_state_changed (message, &old, &new); + GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new); + if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING) + done = TRUE; + gst_message_unref (message); + } } g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL); diff --git a/tests/check/pipelines/cleanup.c b/tests/check/pipelines/cleanup.c index 6872447..5024db4 100644 --- a/tests/check/pipelines/cleanup.c +++ b/tests/check/pipelines/cleanup.c @@ -48,11 +48,14 @@ run_pipeline (GstElement * pipe, gchar * descr, gst_element_set_state (pipe, GST_STATE_PLAYING); while (1) { - revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2); + GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2); - /* always have to pop the message before getting back into poll */ - if (revent != GST_MESSAGE_UNKNOWN) - gst_message_unref (gst_bus_pop (bus)); + if (message) { + revent = GST_MESSAGE_TYPE (message); + gst_message_unref (message); + } else { + revent = GST_MESSAGE_UNKNOWN; + } if (revent == tevent) { break; diff --git a/tests/check/pipelines/simple-launch-lines.c b/tests/check/pipelines/simple-launch-lines.c index 293e887..c40b88f 100644 --- a/tests/check/pipelines/simple-launch-lines.c +++ b/tests/check/pipelines/simple-launch-lines.c @@ -59,11 +59,14 @@ run_pipeline (GstElement * pipe, gchar * descr, } while (1) { - revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2); + GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2); - /* always have to pop the message before getting back into poll */ - if (revent != GST_MESSAGE_UNKNOWN) - gst_message_unref (gst_bus_pop (bus)); + if (message) { + revent = GST_MESSAGE_TYPE (message); + gst_message_unref (message); + } else { + revent = GST_MESSAGE_UNKNOWN; + } if (revent == tevent) { break; @@ -138,6 +141,7 @@ GST_START_TEST (test_stop_from_app) GstElement *fakesrc, *fakesink, *pipeline; GstBus *bus; GstMessageType revent; + GstMessage *message; assert_live_count (GST_TYPE_BUFFER, 0); @@ -159,17 +163,22 @@ GST_START_TEST (test_stop_from_app) g_assert (bus); /* will time out after half a second */ - revent = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2); - + message = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2); + if (message) { + revent = GST_MESSAGE_TYPE (message); + gst_message_unref (message); + } else { + revent = GST_MESSAGE_UNKNOWN; + } g_return_if_fail (revent == GST_MESSAGE_APPLICATION); - gst_message_unref (gst_bus_pop (bus)); gst_element_set_state (pipeline, GST_STATE_NULL); gst_object_unref (pipeline); assert_live_count (GST_TYPE_BUFFER, 0); } -GST_END_TEST Suite * simple_launch_lines_suite (void) +GST_END_TEST Suite * +simple_launch_lines_suite (void) { Suite *s = suite_create ("Pipelines"); TCase *tc_chain = tcase_create ("linear"); diff --git a/tools/gst-launch.c b/tools/gst-launch.c index f91abdf..448d8cd 100644 --- a/tools/gst-launch.c +++ b/tools/gst-launch.c @@ -356,7 +356,6 @@ static gboolean event_loop (GstElement * pipeline, gboolean blocking) { GstBus *bus; - GstMessageType revent; GstMessage *message = NULL; bus = gst_element_get_bus (GST_ELEMENT (pipeline)); @@ -364,17 +363,14 @@ event_loop (GstElement * pipeline, gboolean blocking) g_timeout_add (50, (GSourceFunc) check_intr, pipeline); while (TRUE) { - revent = gst_bus_poll (bus, GST_MESSAGE_ANY, blocking ? -1 : 0); + message = gst_bus_poll (bus, GST_MESSAGE_ANY, blocking ? -1 : 0); /* if the poll timed out, only when !blocking */ - if (revent == GST_MESSAGE_UNKNOWN) { + if (message == NULL) { gst_object_unref (bus); return FALSE; } - message = gst_bus_pop (bus); - g_return_val_if_fail (message != NULL, TRUE); - if (messages) { const GstStructure *s; @@ -390,7 +386,7 @@ event_loop (GstElement * pipeline, gboolean blocking) } } - switch (revent) { + switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_EOS: g_print (_ ("Got EOS from element \"%s\".\n"), diff --git a/tools/gst-md5sum.c b/tools/gst-md5sum.c index ac2cbe1..a126725 100644 --- a/tools/gst-md5sum.c +++ b/tools/gst-md5sum.c @@ -12,18 +12,16 @@ static gboolean event_loop (GstElement * pipeline) { GstBus *bus; - GstMessageType revent; GstMessage *message = NULL; bus = gst_element_get_bus (GST_ELEMENT (pipeline)); while (TRUE) { - revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1); + message = gst_bus_poll (bus, GST_MESSAGE_ANY, -1); - message = gst_bus_pop (bus); g_return_val_if_fail (message != NULL, TRUE); - switch (revent) { + switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_EOS: gst_message_unref (message); return FALSE; -- 2.7.4