GstBusHandler -> GstBusFunc, return value has the same meaning as any other GSource...
authorWim Taymans <wim.taymans@gmail.com>
Mon, 19 Sep 2005 11:18:03 +0000 (11:18 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Mon, 19 Sep 2005 11:18:03 +0000 (11:18 +0000)
Original commit message from CVS:
* check/gst/gstbin.c: (pop_messages), (GST_START_TEST):
* check/gst/gstbus.c: (message_func_eos), (message_func_app),
(send_messages), (GST_START_TEST), (gstbus_suite):
* check/gst/gstpipeline.c: (GST_START_TEST):
* check/pipelines/cleanup.c: (run_pipeline):
* check/pipelines/simple_launch_lines.c: (run_pipeline),
(GST_START_TEST):
* gst/gstbus.c: (gst_bus_have_pending), (gst_bus_source_prepare),
(gst_bus_source_check), (gst_bus_source_dispatch),
(gst_bus_create_watch), (gst_bus_add_watch_full),
(gst_bus_add_watch), (poll_func), (poll_timeout), (gst_bus_poll):
* gst/gstbus.h:
* tools/gst-launch.c: (event_loop):
* tools/gst-md5sum.c: (event_loop):
GstBusHandler -> GstBusFunc, return value has the same meaning as
any other GSource (FALSE == remove source).
_add_watch() and _add_watch_full() now take a MessageType mask to
only handle specific types of messages.
_poll() returns the GstMessage instead of the message type to avoid
race conditions.
_have_pending() takes a MessageType mask now too.
Added testsuite for multiple bus watches.
Fix testsuites and applications for new bus API.

15 files changed:
ChangeLog
check/gst/gstbin.c
check/gst/gstbus.c
check/gst/gstpipeline.c
check/pipelines/cleanup.c
check/pipelines/simple_launch_lines.c
gst/gstbus.c
gst/gstbus.h
tests/check/gst/gstbin.c
tests/check/gst/gstbus.c
tests/check/gst/gstpipeline.c
tests/check/pipelines/cleanup.c
tests/check/pipelines/simple-launch-lines.c
tools/gst-launch.c
tools/gst-md5sum.c

index ec11518..1cb2c0b 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,29 @@
+2005-09-19  Wim Taymans  <wim@fluendo.com>
+
+       * check/gst/gstbin.c: (pop_messages), (GST_START_TEST):
+       * check/gst/gstbus.c: (message_func_eos), (message_func_app),
+       (send_messages), (GST_START_TEST), (gstbus_suite):
+       * check/gst/gstpipeline.c: (GST_START_TEST):
+       * check/pipelines/cleanup.c: (run_pipeline):
+       * check/pipelines/simple_launch_lines.c: (run_pipeline),
+       (GST_START_TEST):
+       * gst/gstbus.c: (gst_bus_have_pending), (gst_bus_source_prepare),
+       (gst_bus_source_check), (gst_bus_source_dispatch),
+       (gst_bus_create_watch), (gst_bus_add_watch_full),
+       (gst_bus_add_watch), (poll_func), (poll_timeout), (gst_bus_poll):
+       * gst/gstbus.h:
+       * tools/gst-launch.c: (event_loop):
+       * tools/gst-md5sum.c: (event_loop):
+       GstBusHandler -> GstBusFunc, return value has the same meaning as
+       any other GSource (FALSE == remove source).
+       _add_watch() and _add_watch_full() now take a MessageType mask to
+       only handle specific types of messages.
+       _poll() returns the GstMessage instead of the message type to avoid
+       race conditions.
+       _have_pending() takes a MessageType mask now too.
+       Added testsuite for multiple bus watches.
+       Fix testsuites and applications for new bus API.
+
 2005-09-19  Thomas Vander Stichele  <thomas at apestaart dot org>
 
        * check/Makefile.am:
index fa704be..16c6f62 100644 (file)
@@ -31,10 +31,11 @@ pop_messages (GstBus * bus, int count)
 
   GST_DEBUG ("popping %d messages", count);
   for (i = 0; i < count; ++i) {
-    fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+    message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+
+    fail_unless (message && GST_MESSAGE_TYPE (message)
         == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
 
-    message = gst_bus_pop (bus);
     gst_message_unref (message);
   }
   GST_DEBUG ("popped %d messages", count);
@@ -121,11 +122,11 @@ GST_START_TEST (test_message_state_changed)
   ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
 
   /* get and unref the message, causing a decref on the bin */
-  fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED,
-          -1) == GST_MESSAGE_STATE_CHANGED,
-      "did not get GST_MESSAGE_STATE_CHANGED");
+  message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+
+  fail_unless (message && GST_MESSAGE_TYPE (message)
+      == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
 
-  message = gst_bus_pop (bus);
   gst_message_unref (message);
 
   ASSERT_OBJECT_REFCOUNT (bin, "bin", 1);
@@ -166,10 +167,10 @@ GST_START_TEST (test_message_state_changed_child)
   ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
 
   /* get and unref the message, causing a decref on the src */
-  fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+  message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+  fail_unless (message && GST_MESSAGE_TYPE (message)
       == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
 
-  message = gst_bus_pop (bus);
   fail_unless (message->src == GST_OBJECT (src));
   gst_message_unref (message);
 
@@ -177,10 +178,10 @@ GST_START_TEST (test_message_state_changed_child)
   ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
 
   /* get and unref message 2, causing a decref on the bin */
-  fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+  message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+  fail_unless (message && GST_MESSAGE_TYPE (message)
       == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
 
-  message = gst_bus_pop (bus);
   fail_unless (message->src == GST_OBJECT (bin));
   gst_message_unref (message);
 
@@ -341,7 +342,7 @@ GST_START_TEST (test_watch_for_state_change)
 
   pop_messages (bus, 5);
 
-  fail_unless (gst_bus_have_pending (bus) == FALSE,
+  fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
       "Unexpected messages on bus");
 
   gst_bin_watch_for_state_change (GST_BIN (bin));
@@ -349,7 +350,7 @@ GST_START_TEST (test_watch_for_state_change)
   /* should get the bin's state change message now */
   pop_messages (bus, 1);
 
-  fail_unless (gst_bus_have_pending (bus) == FALSE,
+  fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
       "Unexpected messages on bus");
 
   fail_unless (gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING)
@@ -364,7 +365,7 @@ GST_START_TEST (test_watch_for_state_change)
 
   pop_messages (bus, 3);
 
-  fail_unless (gst_bus_have_pending (bus) == FALSE,
+  fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
       "Unexpected messages on bus");
 
   /* setting bin to NULL flushes the bus automatically */
index 54ea722..8fb3609 100644 (file)
  * Boston, MA 02111-1307, USA.
  */
 
-
 #include <gst/check/gstcheck.h>
 
-
 static GstBus *test_bus = NULL;
+static GMainLoop *main_loop;
 
 #define NUM_MESSAGES 1000
 #define NUM_THREADS 10
@@ -99,8 +98,86 @@ GST_START_TEST (test_hammer_bus)
 
   gst_object_unref ((GstObject *) test_bus);
 }
-GST_END_TEST Suite *
-gstbus_suite (void)
+GST_END_TEST static gboolean
+message_func_eos (GstBus * bus, GstMessage * message, gpointer data)
+{
+  const GstStructure *s;
+  gint i;
+
+  g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
+
+  GST_DEBUG ("got EOS message");
+
+  s = gst_message_get_structure (message);
+  if (!gst_structure_get_int (s, "msg_id", &i))
+    g_critical ("Invalid message");
+
+  return i != 9;
+}
+
+static gboolean
+message_func_app (GstBus * bus, GstMessage * message, gpointer data)
+{
+  const GstStructure *s;
+  gint i;
+
+  g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
+      FALSE);
+
+  GST_DEBUG ("got APP message");
+
+  s = gst_message_get_structure (message);
+  if (!gst_structure_get_int (s, "msg_id", &i))
+    g_critical ("Invalid message");
+
+  return i != 9;
+}
+
+static gboolean
+send_messages (gpointer data)
+{
+  GstMessage *m;
+  GstStructure *s;
+  gint i;
+
+  for (i = 0; i < 10; i++) {
+    s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+    m = gst_message_new_application (NULL, s);
+    gst_bus_post (test_bus, m);
+    s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+    m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
+    gst_bus_post (test_bus, m);
+  }
+
+  return FALSE;
+}
+
+/* test id adding two watches for different message types calls the
+ * respective callbacks. */
+GST_START_TEST (test_watch)
+{
+  guint id1, id2;
+
+  test_bus = gst_bus_new ();
+
+  main_loop = g_main_loop_new (NULL, FALSE);
+
+  id2 = gst_bus_add_watch (test_bus, GST_MESSAGE_EOS, message_func_eos, NULL);
+  id1 =
+      gst_bus_add_watch (test_bus, GST_MESSAGE_APPLICATION, message_func_app,
+      NULL);
+
+  g_idle_add ((GSourceFunc) send_messages, NULL);
+  while (g_main_context_pending (NULL))
+    g_main_context_iteration (NULL, FALSE);
+
+  g_source_remove (id1);
+  g_source_remove (id2);
+  g_main_loop_unref (main_loop);
+
+  gst_object_unref ((GstObject *) test_bus);
+}
+GST_END_TEST Suite * gstbus_suite (void)
 {
   Suite *s = suite_create ("GstBus");
   TCase *tc_chain = tcase_create ("stresstest");
@@ -109,6 +186,7 @@ gstbus_suite (void)
 
   suite_add_tcase (s, tc_chain);
   tcase_add_test (tc_chain, test_hammer_bus);
+  tcase_add_test (tc_chain, test_watch);
   return s;
 }
 
index 698cabf..ab68db1 100644 (file)
@@ -87,15 +87,15 @@ GST_START_TEST (test_async_state_change_fake)
   while (!done) {
     GstMessage *message;
     GstState old, new;
-    GstMessageType type;
-
-    type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
-    message = gst_bus_pop (bus);
-    gst_message_parse_state_changed (message, &old, &new);
-    GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
-    if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
-      done = TRUE;
-    gst_message_unref (message);
+
+    message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+    if (message) {
+      gst_message_parse_state_changed (message, &old, &new);
+      GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
+      if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
+        done = TRUE;
+      gst_message_unref (message);
+    }
   }
 
   g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL);
index 6872447..5024db4 100644 (file)
@@ -48,11 +48,14 @@ run_pipeline (GstElement * pipe, gchar * descr,
   gst_element_set_state (pipe, GST_STATE_PLAYING);
 
   while (1) {
-    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
+    GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
 
-    /* always have to pop the message before getting back into poll */
-    if (revent != GST_MESSAGE_UNKNOWN)
-      gst_message_unref (gst_bus_pop (bus));
+    if (message) {
+      revent = GST_MESSAGE_TYPE (message);
+      gst_message_unref (message);
+    } else {
+      revent = GST_MESSAGE_UNKNOWN;
+    }
 
     if (revent == tevent) {
       break;
index 293e887..c40b88f 100644 (file)
@@ -59,11 +59,14 @@ run_pipeline (GstElement * pipe, gchar * descr,
   }
 
   while (1) {
-    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
+    GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
 
-    /* always have to pop the message before getting back into poll */
-    if (revent != GST_MESSAGE_UNKNOWN)
-      gst_message_unref (gst_bus_pop (bus));
+    if (message) {
+      revent = GST_MESSAGE_TYPE (message);
+      gst_message_unref (message);
+    } else {
+      revent = GST_MESSAGE_UNKNOWN;
+    }
 
     if (revent == tevent) {
       break;
@@ -138,6 +141,7 @@ GST_START_TEST (test_stop_from_app)
   GstElement *fakesrc, *fakesink, *pipeline;
   GstBus *bus;
   GstMessageType revent;
+  GstMessage *message;
 
   assert_live_count (GST_TYPE_BUFFER, 0);
 
@@ -159,17 +163,22 @@ GST_START_TEST (test_stop_from_app)
   g_assert (bus);
 
   /* will time out after half a second */
-  revent = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
-
+  message = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
+  if (message) {
+    revent = GST_MESSAGE_TYPE (message);
+    gst_message_unref (message);
+  } else {
+    revent = GST_MESSAGE_UNKNOWN;
+  }
   g_return_if_fail (revent == GST_MESSAGE_APPLICATION);
-  gst_message_unref (gst_bus_pop (bus));
 
   gst_element_set_state (pipeline, GST_STATE_NULL);
   gst_object_unref (pipeline);
 
   assert_live_count (GST_TYPE_BUFFER, 0);
 }
-GST_END_TEST Suite * simple_launch_lines_suite (void)
+GST_END_TEST Suite *
+simple_launch_lines_suite (void)
 {
   Suite *s = suite_create ("Pipelines");
   TCase *tc_chain = tcase_create ("linear");
index d3176e1..38fafc1 100644 (file)
@@ -42,8 +42,8 @@
  * up to the specified timeout value until one of the specified messages types
  * is posted on the bus. The application can then _pop() the messages from the
  * bus to handle them.
- * Alternatively the application can register an asynchronous bus handler using
- * gst_bus_add_watch_full() orgst_bus_add_watch(). This handler will receive
+ * Alternatively the application can register an asynchronous bus function using
+ * gst_bus_add_watch_full() orgst_bus_add_watch(). This function will receive
  * messages a short while after they have been posted.
  * 
  * It is also possible to get messages from the bus without any thread 
  * message on the bus. This should only be used if the application is able
  * to deal with messages from different threads.
  *
- * It is important to make sure that every message is popped from the bus at
- * some point in time. Otherwise it will be presented to the watches (#GSource
- * elements) again and again. One way to implement it is having one watch with a
- * low priority (see gst_add_watch_full()) that pops all messages.
- * 
- * Every #GstPipeline has one bus.
+ * Every #GstBin has one bus.
  */
 
 #include <errno.h>
@@ -324,26 +319,35 @@ is_flushing:
 /**
  * gst_bus_have_pending:
  * @bus: a #GstBus to check
+ * @events: a mask of #GstMessageType, representing the set of message types to
+ * watch for.
  *
- * Check if there are pending messages on the bus that should be 
- * handled.
+ * Check if there are pending messages on the bus of the given types that 
+ * should be handled.
  *
  * Returns: TRUE if there are messages on the bus to be handled.
  *
  * MT safe.
  */
 gboolean
-gst_bus_have_pending (GstBus * bus)
+gst_bus_have_pending (GstBus * bus, GstMessageType events)
 {
-  gint length;
+  GstMessage *message;
+  gboolean result;
 
   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
 
   g_mutex_lock (bus->queue_lock);
-  length = g_queue_get_length (bus->queue);
+  /* see if there is a message on the bus that satisfies the
+   * event mask */
+  message = g_queue_peek_head (bus->queue);
+  if (message)
+    result = (GST_MESSAGE_TYPE (message) & events) != 0;
+  else
+    result = FALSE;
   g_mutex_unlock (bus->queue_lock);
 
-  return (length > 0);
+  return result;
 }
 
 /**
@@ -470,29 +474,34 @@ typedef struct
 {
   GSource source;
   GstBus *bus;
+  GstMessageType events;
 } GstBusSource;
 
 static gboolean
 gst_bus_source_prepare (GSource * source, gint * timeout)
 {
+  GstBusSource *bsrc = (GstBusSource *) source;
+
   *timeout = -1;
-  return gst_bus_have_pending (((GstBusSource *) source)->bus);
+  return gst_bus_have_pending (bsrc->bus, bsrc->events);
 }
 
 static gboolean
 gst_bus_source_check (GSource * source)
 {
-  return gst_bus_have_pending (((GstBusSource *) source)->bus);
+  GstBusSource *bsrc = (GstBusSource *) source;
+
+  return gst_bus_have_pending (bsrc->bus, bsrc->events);
 }
 
 static gboolean
 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
     gpointer user_data)
 {
-  GstBusHandler handler = (GstBusHandler) callback;
+  GstBusFunc handler = (GstBusFunc) callback;
   GstBusSource *bsource = (GstBusSource *) source;
   GstMessage *message;
-  gboolean needs_pop = TRUE;
+  gboolean keep;
   GstBus *bus;
 
   g_return_val_if_fail (bsource != NULL, FALSE);
@@ -501,34 +510,20 @@ gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
 
   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
 
-  message = gst_bus_peek (bus);
-
-  GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message);
-
-  g_return_val_if_fail (message != NULL, TRUE);
+  message = gst_bus_pop (bus);
+  g_return_val_if_fail (message != NULL, FALSE);
 
   if (!handler)
     goto no_handler;
 
   GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
 
-  needs_pop = handler (bus, message, user_data);
+  keep = handler (bus, message, user_data);
   gst_message_unref (message);
 
-  GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop);
-  if (needs_pop) {
-    message = gst_bus_pop (bus);
-    if (message) {
-      gst_message_unref (message);
-    } else {
-      /* after executing the handler, the app could have disposed
-       * the pipeline and set the bus to flushing. It is possible
-       * then that there are no more messages on the bus. this is
-       * not a problem. */
-      GST_DEBUG ("handler requested pop but no message on the bus");
-    }
-  }
-  return TRUE;
+  GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep);
+
+  return keep;
 
 no_handler:
   {
@@ -558,13 +553,19 @@ static GSourceFuncs gst_bus_source_funcs = {
 /**
  * gst_bus_create_watch:
  * @bus: a #GstBus to create the watch for
+ * @events: a mask of #GstMessageType, representing the set of message types to
+ * watch for.
  *
- * Create watch for this bus. 
+ * Create watch for this bus. The source will only act on messages of the
+ * given types, messages of other types will simply remain on the bus and 
+ * this GSource will not be dispatched again before the message is popped off
+ * the bus. For this reason one typically has a low priority GSource that
+ * pops all remaining messages from the bus not handled by the other GSources.
  *
  * Returns: A #GSource that can be added to a mainloop.
  */
 GSource *
-gst_bus_create_watch (GstBus * bus)
+gst_bus_create_watch (GstBus * bus, GstMessageType events)
 {
   GstBusSource *source;
 
@@ -574,6 +575,7 @@ gst_bus_create_watch (GstBus * bus)
       sizeof (GstBusSource));
   gst_object_ref (bus);
   source->bus = bus;
+  source->events = events;
 
   return (GSource *) source;
 }
@@ -582,34 +584,37 @@ gst_bus_create_watch (GstBus * bus)
  * gst_bus_add_watch_full:
  * @bus: a #GstBus to create the watch for.
  * @priority: The priority of the watch.
- * @handler: A function to call when a message is received.
- * @user_data: user data passed to @handler.
+ * @events: a mask of #GstMessageType, representing the set of message types to
+ * watch for.
+ * @func: A function to call when a message is received.
+ * @user_data: user data passed to @func.
  * @notify: the function to call when the source is removed.
  *
- * Adds the bus to the mainloop with the given priority. If the handler returns
- * TRUE, the message will then be popped off the queue. When the handler is
- * called, the message belongs to the caller; if you want to keep a copy of it,
- * call gst_message_ref before leaving the handler.
+ * Adds the bus to the mainloop with the given priority. If the func returns
+ * FALSE, the func will be removed. 
+ *
+ * When the func is called, the message belongs to the caller; if you want to 
+ * keep a copy of it, call gst_message_ref before leaving the func.
  *
  * Returns: The event source id.
  *
  * MT safe.
  */
 guint
-gst_bus_add_watch_full (GstBus * bus, gint priority,
-    GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
+gst_bus_add_watch_full (GstBus * bus, gint priority, GstMessageType events,
+    GstBusFunc func, gpointer user_data, GDestroyNotify notify)
 {
   guint id;
   GSource *source;
 
   g_return_val_if_fail (GST_IS_BUS (bus), 0);
 
-  source = gst_bus_create_watch (bus);
+  source = gst_bus_create_watch (bus, events);
 
   if (priority != G_PRIORITY_DEFAULT)
     g_source_set_priority (source, priority);
 
-  g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
+  g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
 
   id = g_source_attach (source, NULL);
   g_source_unref (source);
@@ -621,8 +626,10 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
 /**
  * gst_bus_add_watch:
  * @bus: a #GstBus to create the watch for
- * @handler: A function to call when a message is received.
- * @user_data: user data passed to @handler.
+ * @events: a mask of #GstMessageType, representing the set of message types to
+ * watch for.
+ * @func: A function to call when a message is received.
+ * @user_data: user data passed to @func.
  *
  * Adds the bus to the mainloop with the default priority.
  *
@@ -631,10 +638,11 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
  * MT safe.
  */
 guint
-gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data)
+gst_bus_add_watch (GstBus * bus, GstMessageType events, GstBusFunc func,
+    gpointer user_data)
 {
-  return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data,
-      NULL);
+  return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, events, func,
+      user_data, NULL);
 }
 
 typedef struct
@@ -643,25 +651,26 @@ typedef struct
   guint timeout_id;
   gboolean source_running;
   GstMessageType events;
-  GstMessageType revent;
+  GstMessage *message;
 } GstBusPollData;
 
 static gboolean
-poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
+poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
 {
   if (!g_main_loop_is_running (poll_data->loop))
-    return FALSE;
+    return TRUE;
 
   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
-    poll_data->revent = GST_MESSAGE_TYPE (message);
+    g_return_val_if_fail (poll_data->message == NULL, FALSE);
+    /* keep ref to message */
+    poll_data->message = gst_message_ref (message);
     g_main_loop_quit (poll_data->loop);
-
-    /* keep the message on the queue */
-    return FALSE;
   } else {
-    /* pop and unref the message */
-    return TRUE;
+    /* don't remove the source. */
   }
+  /* we always keep the source alive so that we don't accidentialy
+   * free the poll_data */
+  return TRUE;
 }
 
 static gboolean
@@ -669,8 +678,9 @@ poll_timeout (GstBusPollData * poll_data)
 {
   g_main_loop_quit (poll_data->loop);
 
-  /* returning FALSE will remove the source id */
-  return FALSE;
+  /* we don't remove the GSource as this would free our poll_data,
+   * which we still need */
+  return TRUE;
 }
 
 static void
@@ -706,24 +716,21 @@ poll_destroy_timeout (GstBusPollData * poll_data)
  *
  * This function will enter the default mainloop while polling.
  *
- * Returns: The type of the message that was received, or GST_MESSAGE_UNKNOWN if
- * the poll timed out. The message will remain in the bus queue; you will need
- * to gst_bus_pop() it off before entering gst_bus_poll() again.
+ * Returns: The message that was received, or NULL if the poll timed out. 
+ * The message is taken from the bus and needs to be unreffed after usage.
  */
-GstMessageType
+GstMessage *
 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
 {
   GstBusPollData *poll_data;
-  GstMessageType ret;
+  GstMessage *ret;
   guint id;
 
   poll_data = g_new0 (GstBusPollData, 1);
-  g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN);
-
   poll_data->source_running = TRUE;
   poll_data->loop = g_main_loop_new (NULL, FALSE);
   poll_data->events = events;
-  poll_data->revent = GST_MESSAGE_UNKNOWN;
+  poll_data->message = NULL;
 
   if (timeout >= 0)
     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
@@ -732,10 +739,12 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
   else
     poll_data->timeout_id = 0;
 
-  id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE,
-      (GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy);
+  id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE, GST_MESSAGE_ANY,
+      (GstBusFunc) poll_func, poll_data, (GDestroyNotify) poll_destroy);
+
   g_main_loop_run (poll_data->loop);
-  ret = poll_data->revent;
+  /* holds a ref */
+  ret = poll_data->message;
 
   if (poll_data->timeout_id)
     g_source_remove (poll_data->timeout_id);
@@ -743,7 +752,7 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
   /* poll_data may get destroyed at any time now */
   g_source_remove (id);
 
-  GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret);
+  GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
 
   return ret;
 }
index 50c6877..ff7625e 100644 (file)
@@ -59,24 +59,29 @@ typedef enum
  * @data: user data that has been given, when registering the handler
  *
  * Handler will be invoked synchronously, when a new message has been injected
- * into the bus.
+ * into the bus. This function is mostly used internally. Only one sync handler
+ * can be attached to a given bus.
  *
  * Returns: #GstBusSyncReply stating what to do with the message
  */
-typedef GstBusSyncReply (*GstBusSyncHandler)   (GstBus * bus, GstMessage * message, gpointer data);
+typedef GstBusSyncReply (*GstBusSyncHandler)   (GstBus * bus, GstMessage * message, gpointer data);
+
 /**
- * GstBusHandler:
+ * GstBusFunc:
  * @bus: the #GstBus that sent the message
  * @message: the #GstMessage
  * @data: user data that has been given, when registering the handler
  *
- * Handler will be invoked asynchronously, after a new message has been injected
- * into the bus. Return %TRUE if the message has been handled. It will then be
- * taken from the bus and _unref()'ed.
+ * Specifies the type of function passed to #gst_bus_add_watch() or 
+ * #gst_bus_add_watch_full(), which is called from the mainloop when a message
+ * is available on the bus.
+ *
+ * The message passed to the function will be unreffed after execution of this
+ * function so it should not be freed in the function. 
  *
- * Returns: %TRUE if message should be taken from the bus
+ * Returns: %FALSE if the event source should be removed. 
  */
-typedef gboolean       (*GstBusHandler)        (GstBus * bus, GstMessage * message, gpointer data);
+typedef gboolean       (*GstBusFunc)           (GstBus * bus, GstMessage * message, gpointer data);
 
 struct _GstBus
 {
@@ -107,7 +112,7 @@ GstBus*                     gst_bus_new                     (void);
 
 gboolean               gst_bus_post                    (GstBus * bus, GstMessage * message);
 
-gboolean               gst_bus_have_pending            (GstBus * bus);
+gboolean               gst_bus_have_pending            (GstBus * bus, GstMessageType events);
 GstMessage *           gst_bus_peek                    (GstBus * bus);
 GstMessage *           gst_bus_pop                     (GstBus * bus);
 void                   gst_bus_set_flushing            (GstBus * bus, gboolean flushing);
@@ -115,16 +120,19 @@ void                      gst_bus_set_flushing            (GstBus * bus, gboolean flushing);
 void                   gst_bus_set_sync_handler        (GstBus * bus, GstBusSyncHandler func,
                                                         gpointer data);
 
-GSource *              gst_bus_create_watch            (GstBus * bus);
+GSource *              gst_bus_create_watch            (GstBus * bus, GstMessageType events);
 guint                  gst_bus_add_watch_full          (GstBus * bus,
                                                         gint priority,
-                                                        GstBusHandler handler, 
+                                                        GstMessageType events,
+                                                        GstBusFunc func, 
                                                         gpointer user_data, 
                                                         GDestroyNotify notify);
 guint                  gst_bus_add_watch               (GstBus * bus,
-                                                        GstBusHandler handler, 
+                                                        GstMessageType events,
+                                                        GstBusFunc func, 
                                                         gpointer user_data);
-GstMessageType         gst_bus_poll                    (GstBus *bus, GstMessageType events,
+
+GstMessage*            gst_bus_poll                    (GstBus *bus, GstMessageType events,
                                                          GstClockTimeDiff timeout);
 
 G_END_DECLS
index fa704be..16c6f62 100644 (file)
@@ -31,10 +31,11 @@ pop_messages (GstBus * bus, int count)
 
   GST_DEBUG ("popping %d messages", count);
   for (i = 0; i < count; ++i) {
-    fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+    message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+
+    fail_unless (message && GST_MESSAGE_TYPE (message)
         == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
 
-    message = gst_bus_pop (bus);
     gst_message_unref (message);
   }
   GST_DEBUG ("popped %d messages", count);
@@ -121,11 +122,11 @@ GST_START_TEST (test_message_state_changed)
   ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
 
   /* get and unref the message, causing a decref on the bin */
-  fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED,
-          -1) == GST_MESSAGE_STATE_CHANGED,
-      "did not get GST_MESSAGE_STATE_CHANGED");
+  message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+
+  fail_unless (message && GST_MESSAGE_TYPE (message)
+      == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
 
-  message = gst_bus_pop (bus);
   gst_message_unref (message);
 
   ASSERT_OBJECT_REFCOUNT (bin, "bin", 1);
@@ -166,10 +167,10 @@ GST_START_TEST (test_message_state_changed_child)
   ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
 
   /* get and unref the message, causing a decref on the src */
-  fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+  message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+  fail_unless (message && GST_MESSAGE_TYPE (message)
       == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
 
-  message = gst_bus_pop (bus);
   fail_unless (message->src == GST_OBJECT (src));
   gst_message_unref (message);
 
@@ -177,10 +178,10 @@ GST_START_TEST (test_message_state_changed_child)
   ASSERT_OBJECT_REFCOUNT (bin, "bin", 2);
 
   /* get and unref message 2, causing a decref on the bin */
-  fail_unless (gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1)
+  message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+  fail_unless (message && GST_MESSAGE_TYPE (message)
       == GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED");
 
-  message = gst_bus_pop (bus);
   fail_unless (message->src == GST_OBJECT (bin));
   gst_message_unref (message);
 
@@ -341,7 +342,7 @@ GST_START_TEST (test_watch_for_state_change)
 
   pop_messages (bus, 5);
 
-  fail_unless (gst_bus_have_pending (bus) == FALSE,
+  fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
       "Unexpected messages on bus");
 
   gst_bin_watch_for_state_change (GST_BIN (bin));
@@ -349,7 +350,7 @@ GST_START_TEST (test_watch_for_state_change)
   /* should get the bin's state change message now */
   pop_messages (bus, 1);
 
-  fail_unless (gst_bus_have_pending (bus) == FALSE,
+  fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
       "Unexpected messages on bus");
 
   fail_unless (gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING)
@@ -364,7 +365,7 @@ GST_START_TEST (test_watch_for_state_change)
 
   pop_messages (bus, 3);
 
-  fail_unless (gst_bus_have_pending (bus) == FALSE,
+  fail_unless (gst_bus_have_pending (bus, GST_MESSAGE_ANY) == FALSE,
       "Unexpected messages on bus");
 
   /* setting bin to NULL flushes the bus automatically */
index 54ea722..8fb3609 100644 (file)
  * Boston, MA 02111-1307, USA.
  */
 
-
 #include <gst/check/gstcheck.h>
 
-
 static GstBus *test_bus = NULL;
+static GMainLoop *main_loop;
 
 #define NUM_MESSAGES 1000
 #define NUM_THREADS 10
@@ -99,8 +98,86 @@ GST_START_TEST (test_hammer_bus)
 
   gst_object_unref ((GstObject *) test_bus);
 }
-GST_END_TEST Suite *
-gstbus_suite (void)
+GST_END_TEST static gboolean
+message_func_eos (GstBus * bus, GstMessage * message, gpointer data)
+{
+  const GstStructure *s;
+  gint i;
+
+  g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
+
+  GST_DEBUG ("got EOS message");
+
+  s = gst_message_get_structure (message);
+  if (!gst_structure_get_int (s, "msg_id", &i))
+    g_critical ("Invalid message");
+
+  return i != 9;
+}
+
+static gboolean
+message_func_app (GstBus * bus, GstMessage * message, gpointer data)
+{
+  const GstStructure *s;
+  gint i;
+
+  g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
+      FALSE);
+
+  GST_DEBUG ("got APP message");
+
+  s = gst_message_get_structure (message);
+  if (!gst_structure_get_int (s, "msg_id", &i))
+    g_critical ("Invalid message");
+
+  return i != 9;
+}
+
+static gboolean
+send_messages (gpointer data)
+{
+  GstMessage *m;
+  GstStructure *s;
+  gint i;
+
+  for (i = 0; i < 10; i++) {
+    s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+    m = gst_message_new_application (NULL, s);
+    gst_bus_post (test_bus, m);
+    s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+    m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
+    gst_bus_post (test_bus, m);
+  }
+
+  return FALSE;
+}
+
+/* test id adding two watches for different message types calls the
+ * respective callbacks. */
+GST_START_TEST (test_watch)
+{
+  guint id1, id2;
+
+  test_bus = gst_bus_new ();
+
+  main_loop = g_main_loop_new (NULL, FALSE);
+
+  id2 = gst_bus_add_watch (test_bus, GST_MESSAGE_EOS, message_func_eos, NULL);
+  id1 =
+      gst_bus_add_watch (test_bus, GST_MESSAGE_APPLICATION, message_func_app,
+      NULL);
+
+  g_idle_add ((GSourceFunc) send_messages, NULL);
+  while (g_main_context_pending (NULL))
+    g_main_context_iteration (NULL, FALSE);
+
+  g_source_remove (id1);
+  g_source_remove (id2);
+  g_main_loop_unref (main_loop);
+
+  gst_object_unref ((GstObject *) test_bus);
+}
+GST_END_TEST Suite * gstbus_suite (void)
 {
   Suite *s = suite_create ("GstBus");
   TCase *tc_chain = tcase_create ("stresstest");
@@ -109,6 +186,7 @@ gstbus_suite (void)
 
   suite_add_tcase (s, tc_chain);
   tcase_add_test (tc_chain, test_hammer_bus);
+  tcase_add_test (tc_chain, test_watch);
   return s;
 }
 
index 698cabf..ab68db1 100644 (file)
@@ -87,15 +87,15 @@ GST_START_TEST (test_async_state_change_fake)
   while (!done) {
     GstMessage *message;
     GstState old, new;
-    GstMessageType type;
-
-    type = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
-    message = gst_bus_pop (bus);
-    gst_message_parse_state_changed (message, &old, &new);
-    GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
-    if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
-      done = TRUE;
-    gst_message_unref (message);
+
+    message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1);
+    if (message) {
+      gst_message_parse_state_changed (message, &old, &new);
+      GST_DEBUG_OBJECT (message->src, "state change from %d to %d", old, new);
+      if (message->src == GST_OBJECT (pipeline) && new == GST_STATE_PLAYING)
+        done = TRUE;
+      gst_message_unref (message);
+    }
   }
 
   g_object_set (G_OBJECT (pipeline), "play-timeout", 3 * GST_SECOND, NULL);
index 6872447..5024db4 100644 (file)
@@ -48,11 +48,14 @@ run_pipeline (GstElement * pipe, gchar * descr,
   gst_element_set_state (pipe, GST_STATE_PLAYING);
 
   while (1) {
-    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
+    GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
 
-    /* always have to pop the message before getting back into poll */
-    if (revent != GST_MESSAGE_UNKNOWN)
-      gst_message_unref (gst_bus_pop (bus));
+    if (message) {
+      revent = GST_MESSAGE_TYPE (message);
+      gst_message_unref (message);
+    } else {
+      revent = GST_MESSAGE_UNKNOWN;
+    }
 
     if (revent == tevent) {
       break;
index 293e887..c40b88f 100644 (file)
@@ -59,11 +59,14 @@ run_pipeline (GstElement * pipe, gchar * descr,
   }
 
   while (1) {
-    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
+    GstMessage *message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 2);
 
-    /* always have to pop the message before getting back into poll */
-    if (revent != GST_MESSAGE_UNKNOWN)
-      gst_message_unref (gst_bus_pop (bus));
+    if (message) {
+      revent = GST_MESSAGE_TYPE (message);
+      gst_message_unref (message);
+    } else {
+      revent = GST_MESSAGE_UNKNOWN;
+    }
 
     if (revent == tevent) {
       break;
@@ -138,6 +141,7 @@ GST_START_TEST (test_stop_from_app)
   GstElement *fakesrc, *fakesink, *pipeline;
   GstBus *bus;
   GstMessageType revent;
+  GstMessage *message;
 
   assert_live_count (GST_TYPE_BUFFER, 0);
 
@@ -159,17 +163,22 @@ GST_START_TEST (test_stop_from_app)
   g_assert (bus);
 
   /* will time out after half a second */
-  revent = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
-
+  message = gst_bus_poll (bus, GST_MESSAGE_APPLICATION, GST_SECOND / 2);
+  if (message) {
+    revent = GST_MESSAGE_TYPE (message);
+    gst_message_unref (message);
+  } else {
+    revent = GST_MESSAGE_UNKNOWN;
+  }
   g_return_if_fail (revent == GST_MESSAGE_APPLICATION);
-  gst_message_unref (gst_bus_pop (bus));
 
   gst_element_set_state (pipeline, GST_STATE_NULL);
   gst_object_unref (pipeline);
 
   assert_live_count (GST_TYPE_BUFFER, 0);
 }
-GST_END_TEST Suite * simple_launch_lines_suite (void)
+GST_END_TEST Suite *
+simple_launch_lines_suite (void)
 {
   Suite *s = suite_create ("Pipelines");
   TCase *tc_chain = tcase_create ("linear");
index f91abdf..448d8cd 100644 (file)
@@ -356,7 +356,6 @@ static gboolean
 event_loop (GstElement * pipeline, gboolean blocking)
 {
   GstBus *bus;
-  GstMessageType revent;
   GstMessage *message = NULL;
 
   bus = gst_element_get_bus (GST_ELEMENT (pipeline));
@@ -364,17 +363,14 @@ event_loop (GstElement * pipeline, gboolean blocking)
   g_timeout_add (50, (GSourceFunc) check_intr, pipeline);
 
   while (TRUE) {
-    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, blocking ? -1 : 0);
+    message = gst_bus_poll (bus, GST_MESSAGE_ANY, blocking ? -1 : 0);
 
     /* if the poll timed out, only when !blocking */
-    if (revent == GST_MESSAGE_UNKNOWN) {
+    if (message == NULL) {
       gst_object_unref (bus);
       return FALSE;
     }
 
-    message = gst_bus_pop (bus);
-    g_return_val_if_fail (message != NULL, TRUE);
-
     if (messages) {
       const GstStructure *s;
 
@@ -390,7 +386,7 @@ event_loop (GstElement * pipeline, gboolean blocking)
       }
     }
 
-    switch (revent) {
+    switch (GST_MESSAGE_TYPE (message)) {
       case GST_MESSAGE_EOS:
         g_print (_
             ("Got EOS from element \"%s\".\n"),
index ac2cbe1..a126725 100644 (file)
@@ -12,18 +12,16 @@ static gboolean
 event_loop (GstElement * pipeline)
 {
   GstBus *bus;
-  GstMessageType revent;
   GstMessage *message = NULL;
 
   bus = gst_element_get_bus (GST_ELEMENT (pipeline));
 
   while (TRUE) {
-    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
+    message = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
 
-    message = gst_bus_pop (bus);
     g_return_val_if_fail (message != NULL, TRUE);
 
-    switch (revent) {
+    switch (GST_MESSAGE_TYPE (message)) {
       case GST_MESSAGE_EOS:
         gst_message_unref (message);
         return FALSE;