+2005-09-19 Wim Taymans <wim@fluendo.com>
+
+ * check/gst/gstbin.c: (pop_messages), (GST_START_TEST):
+ * check/gst/gstbus.c: (message_func_eos), (message_func_app),
+ (send_messages), (GST_START_TEST), (gstbus_suite):
+ * check/gst/gstpipeline.c: (GST_START_TEST):
+ * check/pipelines/cleanup.c: (run_pipeline):
+ * check/pipelines/simple_launch_lines.c: (run_pipeline),
+ (GST_START_TEST):
+ * gst/gstbus.c: (gst_bus_have_pending), (gst_bus_source_prepare),
+ (gst_bus_source_check), (gst_bus_source_dispatch),
+ (gst_bus_create_watch), (gst_bus_add_watch_full),
+ (gst_bus_add_watch), (poll_func), (poll_timeout), (gst_bus_poll):
+ * gst/gstbus.h:
+ * tools/gst-launch.c: (event_loop):
+ * tools/gst-md5sum.c: (event_loop):
+ GstBusHandler -> GstBusFunc, return value has the same meaning as
+ any other GSource (FALSE == remove source).
+ _add_watch() and _add_watch_full() now take a MessageType mask to
+ only handle specific types of messages.
+ _poll() returns the GstMessage instead of the message type to avoid
+ race conditions.
+ _have_pending() takes a MessageType mask now too.
+ Added testsuite for multiple bus watches.
+ Fix testsuites and applications for new bus API.
+
2005-09-19 Thomas Vander Stichele <thomas at apestaart dot org>
* check/Makefile.am:
GST_DEBUG ("popping %d messages", count);
for (i = 0; i < count; ++i) {
- fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+
+ fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
- message = gst_bus_pop (bus);
gst_message_unref (message);
}
GST_DEBUG ("popped %d messages", count);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref the message, causing a decref on the bin */
- fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED,
- -1) == GST_MESSAGE_STATE_CHANGED,
- "did not get GST_MESSAGE_STATE_CHANGED");
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+
+ fail_unless (message && GST_MESSAGE_TYPE (message)
+ == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
- message = gst_bus_pop (bus);
gst_message_unref (message);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 1);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref the message, causing a decref on the src */
- fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+ fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
- message = gst_bus_pop (bus);
fail_unless (message->src == GST_OBJECT (src));
gst_message_unref (message);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref message 2, causing a decref on the bin */
- fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+ fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
- message = gst_bus_pop (bus);
fail_unless (message->src == GST_OBJECT (bin));
gst_message_unref (message);
pop_messages (bus, 5);
- fail_unless (gst_bus_have_pending (bus) == FALSE,
+ fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
gst_bin_watch_for_state_change (GST_BIN (bin));
/* should get the bin's state change message now */
pop_messages (bus, 1);
- fail_unless (gst_bus_have_pending (bus) == FALSE,
+ fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
fail_unless (gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING)
pop_messages (bus, 3);
- fail_unless (gst_bus_have_pending (bus) == FALSE,
+ fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
/* setting bin to NULL flushes the bus automatically */
* Boston, MA 02111-1307, USA.
*/
-
#include <gst/check/gstcheck.h>
-
static GstBus *test_bus = NULL;
+static GMainLoop *main_loop;
#define NUM_MESSAGES 1000
#define NUM_THREADS 10
gst_object_unref ((GstObject *) test_bus);
}
-GST_END_TEST Suite *
-gstbus_suite (void)
+GST_END_TEST static gboolean
+message_func_eos (GstBus * bus, GstMessage * message, gpointer data)
+{
+ const GstStructure *s;
+ gint i;
+
+ g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
+
+ GST_DEBUG ("got EOS message");
+
+ s = gst_message_get_structure (message);
+ if (!gst_structure_get_int (s, "msg_id", &i))
+ g_critical ("Invalid message");
+
+ return i != 9;
+}
+
+static gboolean
+message_func_app (GstBus * bus, GstMessage * message, gpointer data)
+{
+ const GstStructure *s;
+ gint i;
+
+ g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
+ FALSE);
+
+ GST_DEBUG ("got APP message");
+
+ s = gst_message_get_structure (message);
+ if (!gst_structure_get_int (s, "msg_id", &i))
+ g_critical ("Invalid message");
+
+ return i != 9;
+}
+
+static gboolean
+send_messages (gpointer data)
+{
+ GstMessage *m;
+ GstStructure *s;
+ gint i;
+
+ for (i = 0; i < 10; i++) {
+ s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+ m = gst_message_new_application (NULL, s);
+ gst_bus_post (test_bus, m);
+ s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+ m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
+ gst_bus_post (test_bus, m);
+ }
+
+ return FALSE;
+}
+
+/* test id adding two watches for different message types calls the
+ * respective callbacks. */
+GST_START_TEST (test_watch)
+{
+ guint id1, id2;
+
+ test_bus = gst_bus_new ();
+
+ main_loop = g_main_loop_new (NULL, FALSE);
+
+ id2 = gst_bus_add_watch (test_bus, GST_MESSAGE_EOS, message_func_eos, NULL);
+ id1 =
+ gst_bus_add_watch (test_bus, GST_MESSAGE_APPLICATION, message_func_app,
+ NULL);
+
+ g_idle_add ((GSourceFunc) send_messages, NULL);
+ while (g_main_context_pending (NULL))
+ g_main_context_iteration (NULL, FALSE);
+
+ g_source_remove (id1);
+ g_source_remove (id2);
+ g_main_loop_unref (main_loop);
+
+ gst_object_unref ((GstObject *) test_bus);
+}
+GST_END_TEST Suite * gstbus_suite (void)
{
Suite *s = suite_create ("GstBus");
TCase *tc_chain = tcase_create ("stresstest");
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_hammer_bus);
+ tcase_add_test (tc_chain, test_watch);
return s;
}
while (!done) {
GstMessage *message;
GstState old, new;
- GstMessageType type;
-
- type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
- message = gst_bus_pop (bus);
- gst_message_parse_state_changed (message, &old, &new);
- GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
- if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
- done = TRUE;
- gst_message_unref (message);
+
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+ if (message) {
+ gst_message_parse_state_changed (message, &old, &new);
+ GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
+ if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
+ done = TRUE;
+ gst_message_unref (message);
+ }
}
g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL);
gst_element_set_state (pipe, GST_STATE_PLAYING);
while (1) {
- revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
+ GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
- /* always have to pop the message before getting back into poll */
- if (revent != GST_MESSAGE_UNKNOWN)
- gst_message_unref (gst_bus_pop (bus));
+ if (message) {
+ revent = GST_MESSAGE_TYPE (message);
+ gst_message_unref (message);
+ } else {
+ revent = GST_MESSAGE_UNKNOWN;
+ }
if (revent == tevent) {
break;
}
while (1) {
- revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
+ GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
- /* always have to pop the message before getting back into poll */
- if (revent != GST_MESSAGE_UNKNOWN)
- gst_message_unref (gst_bus_pop (bus));
+ if (message) {
+ revent = GST_MESSAGE_TYPE (message);
+ gst_message_unref (message);
+ } else {
+ revent = GST_MESSAGE_UNKNOWN;
+ }
if (revent == tevent) {
break;
GstElement *fakesrc, *fakesink, *pipeline;
GstBus *bus;
GstMessageType revent;
+ GstMessage *message;
assert_live_count (GST_TYPE_BUFFER, 0);
g_assert (bus);
/* will time out after half a second */
- revent = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
-
+ message = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
+ if (message) {
+ revent = GST_MESSAGE_TYPE (message);
+ gst_message_unref (message);
+ } else {
+ revent = GST_MESSAGE_UNKNOWN;
+ }
g_return_if_fail (revent == GST_MESSAGE_APPLICATION);
- gst_message_unref (gst_bus_pop (bus));
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (pipeline);
assert_live_count (GST_TYPE_BUFFER, 0);
}
-GST_END_TEST Suite * simple_launch_lines_suite (void)
+GST_END_TEST Suite *
+simple_launch_lines_suite (void)
{
Suite *s = suite_create ("Pipelines");
TCase *tc_chain = tcase_create ("linear");
* up to the specified timeout value until one of the specified messages types
* is posted on the bus. The application can then _pop() the messages from the
* bus to handle them.
- * Alternatively the application can register an asynchronous bus handler using
- * gst_bus_add_watch_full() orgst_bus_add_watch(). This handler will receive
+ * Alternatively the application can register an asynchronous bus function using
+ * gst_bus_add_watch_full() orgst_bus_add_watch(). This function will receive
* messages a short while after they have been posted.
*
* It is also possible to get messages from the bus without any thread
* message on the bus. This should only be used if the application is able
* to deal with messages from different threads.
*
- * It is important to make sure that every message is popped from the bus at
- * some point in time. Otherwise it will be presented to the watches (#GSource
- * elements) again and again. One way to implement it is having one watch with a
- * low priority (see gst_add_watch_full()) that pops all messages.
- *
- * Every #GstPipeline has one bus.
+ * Every #GstBin has one bus.
*/
#include <errno.h>
/**
* gst_bus_have_pending:
* @bus: a #GstBus to check
+ * @events: a mask of #GstMessageType, representing the set of message types to
+ * watch for.
*
- * Check if there are pending messages on the bus that should be
- * handled.
+ * Check if there are pending messages on the bus of the given types that
+ * should be handled.
*
* Returns: TRUE if there are messages on the bus to be handled.
*
* MT safe.
*/
gboolean
-gst_bus_have_pending (GstBus * bus)
+gst_bus_have_pending (GstBus * bus, GstMessageType events)
{
- gint length;
+ GstMessage *message;
+ gboolean result;
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_mutex_lock (bus->queue_lock);
- length = g_queue_get_length (bus->queue);
+ /* see if there is a message on the bus that satisfies the
+ * event mask */
+ message = g_queue_peek_head (bus->queue);
+ if (message)
+ result = (GST_MESSAGE_TYPE (message) & events) != 0;
+ else
+ result = FALSE;
g_mutex_unlock (bus->queue_lock);
- return (length > 0);
+ return result;
}
/**
{
GSource source;
GstBus *bus;
+ GstMessageType events;
} GstBusSource;
static gboolean
gst_bus_source_prepare (GSource * source, gint * timeout)
{
+ GstBusSource *bsrc = (GstBusSource *) source;
+
*timeout = -1;
- return gst_bus_have_pending (((GstBusSource *) source)->bus);
+ return gst_bus_have_pending (bsrc->bus, bsrc->events);
}
static gboolean
gst_bus_source_check (GSource * source)
{
- return gst_bus_have_pending (((GstBusSource *) source)->bus);
+ GstBusSource *bsrc = (GstBusSource *) source;
+
+ return gst_bus_have_pending (bsrc->bus, bsrc->events);
}
static gboolean
gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
gpointer user_data)
{
- GstBusHandler handler = (GstBusHandler) callback;
+ GstBusFunc handler = (GstBusFunc) callback;
GstBusSource *bsource = (GstBusSource *) source;
GstMessage *message;
- gboolean needs_pop = TRUE;
+ gboolean keep;
GstBus *bus;
g_return_val_if_fail (bsource != NULL, FALSE);
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
- message = gst_bus_peek (bus);
-
- GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message);
-
- g_return_val_if_fail (message != NULL, TRUE);
+ message = gst_bus_pop (bus);
+ g_return_val_if_fail (message != NULL, FALSE);
if (!handler)
goto no_handler;
GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
- needs_pop = handler (bus, message, user_data);
+ keep = handler (bus, message, user_data);
gst_message_unref (message);
- GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop);
- if (needs_pop) {
- message = gst_bus_pop (bus);
- if (message) {
- gst_message_unref (message);
- } else {
- /* after executing the handler, the app could have disposed
- * the pipeline and set the bus to flushing. It is possible
- * then that there are no more messages on the bus. this is
- * not a problem. */
- GST_DEBUG ("handler requested pop but no message on the bus");
- }
- }
- return TRUE;
+ GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep);
+
+ return keep;
no_handler:
{
/**
* gst_bus_create_watch:
* @bus: a #GstBus to create the watch for
+ * @events: a mask of #GstMessageType, representing the set of message types to
+ * watch for.
*
- * Create watch for this bus.
+ * Create watch for this bus. The source will only act on messages of the
+ * given types, messages of other types will simply remain on the bus and
+ * this GSource will not be dispatched again before the message is popped off
+ * the bus. For this reason one typically has a low priority GSource that
+ * pops all remaining messages from the bus not handled by the other GSources.
*
* Returns: A #GSource that can be added to a mainloop.
*/
GSource *
-gst_bus_create_watch (GstBus * bus)
+gst_bus_create_watch (GstBus * bus, GstMessageType events)
{
GstBusSource *source;
sizeof (GstBusSource));
gst_object_ref (bus);
source->bus = bus;
+ source->events = events;
return (GSource *) source;
}
* gst_bus_add_watch_full:
* @bus: a #GstBus to create the watch for.
* @priority: The priority of the watch.
- * @handler: A function to call when a message is received.
- * @user_data: user data passed to @handler.
+ * @events: a mask of #GstMessageType, representing the set of message types to
+ * watch for.
+ * @func: A function to call when a message is received.
+ * @user_data: user data passed to @func.
* @notify: the function to call when the source is removed.
*
- * Adds the bus to the mainloop with the given priority. If the handler returns
- * TRUE, the message will then be popped off the queue. When the handler is
- * called, the message belongs to the caller; if you want to keep a copy of it,
- * call gst_message_ref before leaving the handler.
+ * Adds the bus to the mainloop with the given priority. If the func returns
+ * FALSE, the func will be removed.
+ *
+ * When the func is called, the message belongs to the caller; if you want to
+ * keep a copy of it, call gst_message_ref before leaving the func.
*
* Returns: The event source id.
*
* MT safe.
*/
guint
-gst_bus_add_watch_full (GstBus * bus, gint priority,
- GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
+gst_bus_add_watch_full (GstBus * bus, gint priority, GstMessageType events,
+ GstBusFunc func, gpointer user_data, GDestroyNotify notify)
{
guint id;
GSource *source;
g_return_val_if_fail (GST_IS_BUS (bus), 0);
- source = gst_bus_create_watch (bus);
+ source = gst_bus_create_watch (bus, events);
if (priority != G_PRIORITY_DEFAULT)
g_source_set_priority (source, priority);
- g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
+ g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
id = g_source_attach (source, NULL);
g_source_unref (source);
/**
* gst_bus_add_watch:
* @bus: a #GstBus to create the watch for
- * @handler: A function to call when a message is received.
- * @user_data: user data passed to @handler.
+ * @events: a mask of #GstMessageType, representing the set of message types to
+ * watch for.
+ * @func: A function to call when a message is received.
+ * @user_data: user data passed to @func.
*
* Adds the bus to the mainloop with the default priority.
*
* MT safe.
*/
guint
-gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data)
+gst_bus_add_watch (GstBus * bus, GstMessageType events, GstBusFunc func,
+ gpointer user_data)
{
- return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data,
- NULL);
+ return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, events, func,
+ user_data, NULL);
}
typedef struct
guint timeout_id;
gboolean source_running;
GstMessageType events;
- GstMessageType revent;
+ GstMessage *message;
} GstBusPollData;
static gboolean
-poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
+poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
{
if (!g_main_loop_is_running (poll_data->loop))
- return FALSE;
+ return TRUE;
if (GST_MESSAGE_TYPE (message) & poll_data->events) {
- poll_data->revent = GST_MESSAGE_TYPE (message);
+ g_return_val_if_fail (poll_data->message == NULL, FALSE);
+ /* keep ref to message */
+ poll_data->message = gst_message_ref (message);
g_main_loop_quit (poll_data->loop);
-
- /* keep the message on the queue */
- return FALSE;
} else {
- /* pop and unref the message */
- return TRUE;
+ /* don't remove the source. */
}
+ /* we always keep the source alive so that we don't accidentialy
+ * free the poll_data */
+ return TRUE;
}
static gboolean
{
g_main_loop_quit (poll_data->loop);
- /* returning FALSE will remove the source id */
- return FALSE;
+ /* we don't remove the GSource as this would free our poll_data,
+ * which we still need */
+ return TRUE;
}
static void
*
* This function will enter the default mainloop while polling.
*
- * Returns: The type of the message that was received, or GST_MESSAGE_UNKNOWN if
- * the poll timed out. The message will remain in the bus queue; you will need
- * to gst_bus_pop() it off before entering gst_bus_poll() again.
+ * Returns: The message that was received, or NULL if the poll timed out.
+ * The message is taken from the bus and needs to be unreffed after usage.
*/
-GstMessageType
+GstMessage *
gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
{
GstBusPollData *poll_data;
- GstMessageType ret;
+ GstMessage *ret;
guint id;
poll_data = g_new0 (GstBusPollData, 1);
- g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN);
-
poll_data->source_running = TRUE;
poll_data->loop = g_main_loop_new (NULL, FALSE);
poll_data->events = events;
- poll_data->revent = GST_MESSAGE_UNKNOWN;
+ poll_data->message = NULL;
if (timeout >= 0)
poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
else
poll_data->timeout_id = 0;
- id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE,
- (GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy);
+ id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE, GST_MESSAGE_ANY,
+ (GstBusFunc) poll_func, poll_data, (GDestroyNotify) poll_destroy);
+
g_main_loop_run (poll_data->loop);
- ret = poll_data->revent;
+ /* holds a ref */
+ ret = poll_data->message;
if (poll_data->timeout_id)
g_source_remove (poll_data->timeout_id);
/* poll_data may get destroyed at any time now */
g_source_remove (id);
- GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret);
+ GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
return ret;
}
* @data: user data that has been given, when registering the handler
*
* Handler will be invoked synchronously, when a new message has been injected
- * into the bus.
+ * into the bus. This function is mostly used internally. Only one sync handler
+ * can be attached to a given bus.
*
* Returns: #GstBusSyncReply stating what to do with the message
*/
-typedef GstBusSyncReply (*GstBusSyncHandler) (GstBus * bus, GstMessage * message, gpointer data);
+typedef GstBusSyncReply (*GstBusSyncHandler) (GstBus * bus, GstMessage * message, gpointer data);
+
/**
- * GstBusHandler:
+ * GstBusFunc:
* @bus: the #GstBus that sent the message
* @message: the #GstMessage
* @data: user data that has been given, when registering the handler
*
- * Handler will be invoked asynchronously, after a new message has been injected
- * into the bus. Return %TRUE if the message has been handled. It will then be
- * taken from the bus and _unref()'ed.
+ * Specifies the type of function passed to #gst_bus_add_watch() or
+ * #gst_bus_add_watch_full(), which is called from the mainloop when a message
+ * is available on the bus.
+ *
+ * The message passed to the function will be unreffed after execution of this
+ * function so it should not be freed in the function.
*
- * Returns: %TRUE if message should be taken from the bus
+ * Returns: %FALSE if the event source should be removed.
*/
-typedef gboolean (*GstBusHandler) (GstBus * bus, GstMessage * message, gpointer data);
+typedef gboolean (*GstBusFunc) (GstBus * bus, GstMessage * message, gpointer data);
struct _GstBus
{
gboolean gst_bus_post (GstBus * bus, GstMessage * message);
-gboolean gst_bus_have_pending (GstBus * bus);
+gboolean gst_bus_have_pending (GstBus * bus, GstMessageType events);
GstMessage * gst_bus_peek (GstBus * bus);
GstMessage * gst_bus_pop (GstBus * bus);
void gst_bus_set_flushing (GstBus * bus, gboolean flushing);
void gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func,
gpointer data);
-GSource * gst_bus_create_watch (GstBus * bus);
+GSource * gst_bus_create_watch (GstBus * bus, GstMessageType events);
guint gst_bus_add_watch_full (GstBus * bus,
gint priority,
- GstBusHandler handler,
+ GstMessageType events,
+ GstBusFunc func,
gpointer user_data,
GDestroyNotify notify);
guint gst_bus_add_watch (GstBus * bus,
- GstBusHandler handler,
+ GstMessageType events,
+ GstBusFunc func,
gpointer user_data);
-GstMessageType gst_bus_poll (GstBus *bus, GstMessageType events,
+
+GstMessage* gst_bus_poll (GstBus *bus, GstMessageType events,
GstClockTimeDiff timeout);
G_END_DECLS
GST_DEBUG ("popping %d messages", count);
for (i = 0; i < count; ++i) {
- fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+
+ fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
- message = gst_bus_pop (bus);
gst_message_unref (message);
}
GST_DEBUG ("popped %d messages", count);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref the message, causing a decref on the bin */
- fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED,
- -1) == GST_MESSAGE_STATE_CHANGED,
- "did not get GST_MESSAGE_STATE_CHANGED");
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+
+ fail_unless (message && GST_MESSAGE_TYPE (message)
+ == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
- message = gst_bus_pop (bus);
gst_message_unref (message);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 1);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref the message, causing a decref on the src */
- fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+ fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
- message = gst_bus_pop (bus);
fail_unless (message->src == GST_OBJECT (src));
gst_message_unref (message);
ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
/* get and unref message 2, causing a decref on the bin */
- fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+ fail_unless (message && GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
- message = gst_bus_pop (bus);
fail_unless (message->src == GST_OBJECT (bin));
gst_message_unref (message);
pop_messages (bus, 5);
- fail_unless (gst_bus_have_pending (bus) == FALSE,
+ fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
gst_bin_watch_for_state_change (GST_BIN (bin));
/* should get the bin's state change message now */
pop_messages (bus, 1);
- fail_unless (gst_bus_have_pending (bus) == FALSE,
+ fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
fail_unless (gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING)
pop_messages (bus, 3);
- fail_unless (gst_bus_have_pending (bus) == FALSE,
+ fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
"Unexpected messages on bus");
/* setting bin to NULL flushes the bus automatically */
* Boston, MA 02111-1307, USA.
*/
-
#include <gst/check/gstcheck.h>
-
static GstBus *test_bus = NULL;
+static GMainLoop *main_loop;
#define NUM_MESSAGES 1000
#define NUM_THREADS 10
gst_object_unref ((GstObject *) test_bus);
}
-GST_END_TEST Suite *
-gstbus_suite (void)
+GST_END_TEST static gboolean
+message_func_eos (GstBus * bus, GstMessage * message, gpointer data)
+{
+ const GstStructure *s;
+ gint i;
+
+ g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
+
+ GST_DEBUG ("got EOS message");
+
+ s = gst_message_get_structure (message);
+ if (!gst_structure_get_int (s, "msg_id", &i))
+ g_critical ("Invalid message");
+
+ return i != 9;
+}
+
+static gboolean
+message_func_app (GstBus * bus, GstMessage * message, gpointer data)
+{
+ const GstStructure *s;
+ gint i;
+
+ g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
+ FALSE);
+
+ GST_DEBUG ("got APP message");
+
+ s = gst_message_get_structure (message);
+ if (!gst_structure_get_int (s, "msg_id", &i))
+ g_critical ("Invalid message");
+
+ return i != 9;
+}
+
+static gboolean
+send_messages (gpointer data)
+{
+ GstMessage *m;
+ GstStructure *s;
+ gint i;
+
+ for (i = 0; i < 10; i++) {
+ s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+ m = gst_message_new_application (NULL, s);
+ gst_bus_post (test_bus, m);
+ s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+ m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
+ gst_bus_post (test_bus, m);
+ }
+
+ return FALSE;
+}
+
+/* test id adding two watches for different message types calls the
+ * respective callbacks. */
+GST_START_TEST (test_watch)
+{
+ guint id1, id2;
+
+ test_bus = gst_bus_new ();
+
+ main_loop = g_main_loop_new (NULL, FALSE);
+
+ id2 = gst_bus_add_watch (test_bus, GST_MESSAGE_EOS, message_func_eos, NULL);
+ id1 =
+ gst_bus_add_watch (test_bus, GST_MESSAGE_APPLICATION, message_func_app,
+ NULL);
+
+ g_idle_add ((GSourceFunc) send_messages, NULL);
+ while (g_main_context_pending (NULL))
+ g_main_context_iteration (NULL, FALSE);
+
+ g_source_remove (id1);
+ g_source_remove (id2);
+ g_main_loop_unref (main_loop);
+
+ gst_object_unref ((GstObject *) test_bus);
+}
+GST_END_TEST Suite * gstbus_suite (void)
{
Suite *s = suite_create ("GstBus");
TCase *tc_chain = tcase_create ("stresstest");
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_hammer_bus);
+ tcase_add_test (tc_chain, test_watch);
return s;
}
while (!done) {
GstMessage *message;
GstState old, new;
- GstMessageType type;
-
- type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
- message = gst_bus_pop (bus);
- gst_message_parse_state_changed (message, &old, &new);
- GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
- if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
- done = TRUE;
- gst_message_unref (message);
+
+ message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+ if (message) {
+ gst_message_parse_state_changed (message, &old, &new);
+ GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
+ if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
+ done = TRUE;
+ gst_message_unref (message);
+ }
}
g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL);
gst_element_set_state (pipe, GST_STATE_PLAYING);
while (1) {
- revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
+ GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
- /* always have to pop the message before getting back into poll */
- if (revent != GST_MESSAGE_UNKNOWN)
- gst_message_unref (gst_bus_pop (bus));
+ if (message) {
+ revent = GST_MESSAGE_TYPE (message);
+ gst_message_unref (message);
+ } else {
+ revent = GST_MESSAGE_UNKNOWN;
+ }
if (revent == tevent) {
break;
}
while (1) {
- revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
+ GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
- /* always have to pop the message before getting back into poll */
- if (revent != GST_MESSAGE_UNKNOWN)
- gst_message_unref (gst_bus_pop (bus));
+ if (message) {
+ revent = GST_MESSAGE_TYPE (message);
+ gst_message_unref (message);
+ } else {
+ revent = GST_MESSAGE_UNKNOWN;
+ }
if (revent == tevent) {
break;
GstElement *fakesrc, *fakesink, *pipeline;
GstBus *bus;
GstMessageType revent;
+ GstMessage *message;
assert_live_count (GST_TYPE_BUFFER, 0);
g_assert (bus);
/* will time out after half a second */
- revent = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
-
+ message = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
+ if (message) {
+ revent = GST_MESSAGE_TYPE (message);
+ gst_message_unref (message);
+ } else {
+ revent = GST_MESSAGE_UNKNOWN;
+ }
g_return_if_fail (revent == GST_MESSAGE_APPLICATION);
- gst_message_unref (gst_bus_pop (bus));
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (pipeline);
assert_live_count (GST_TYPE_BUFFER, 0);
}
-GST_END_TEST Suite * simple_launch_lines_suite (void)
+GST_END_TEST Suite *
+simple_launch_lines_suite (void)
{
Suite *s = suite_create ("Pipelines");
TCase *tc_chain = tcase_create ("linear");
event_loop (GstElement * pipeline, gboolean blocking)
{
GstBus *bus;
- GstMessageType revent;
GstMessage *message = NULL;
bus = gst_element_get_bus (GST_ELEMENT (pipeline));
g_timeout_add (50, (GSourceFunc) check_intr, pipeline);
while (TRUE) {
- revent = gst_bus_poll (bus, GST_MESSAGE_ANY, blocking ? -1 : 0);
+ message = gst_bus_poll (bus, GST_MESSAGE_ANY, blocking ? -1 : 0);
/* if the poll timed out, only when !blocking */
- if (revent == GST_MESSAGE_UNKNOWN) {
+ if (message == NULL) {
gst_object_unref (bus);
return FALSE;
}
- message = gst_bus_pop (bus);
- g_return_val_if_fail (message != NULL, TRUE);
-
if (messages) {
const GstStructure *s;
}
}
- switch (revent) {
+ switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
g_print (_
("Got EOS from element \"%s\".\n"),
event_loop (GstElement * pipeline)
{
GstBus *bus;
- GstMessageType revent;
GstMessage *message = NULL;
bus = gst_element_get_bus (GST_ELEMENT (pipeline));
while (TRUE) {
- revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
+ message = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
- message = gst_bus_pop (bus);
g_return_val_if_fail (message != NULL, TRUE);
- switch (revent) {
+ switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
gst_message_unref (message);
return FALSE;