check/gst/gstbin.c (test_message_state_changed_children): Style fix..
authorAndy Wingo <wingo@pobox.com>
Thu, 10 Nov 2005 18:15:24 +0000 (18:15 +0000)
committerAndy Wingo <wingo@pobox.com>
Thu, 10 Nov 2005 18:15:24 +0000 (18:15 +0000)
Original commit message from CVS:
2005-11-10  Andy Wingo  <wingo@pobox.com>

* check/gst/gstbin.c (test_message_state_changed_children): Style
fix..

* gst/gstbus.c (poll_destroy, poll_func, gst_bus_poll): Implement
gst_bus_poll with the signal watch. Ensures that poll and a signal
watch see the same messages.

* check/gst/gstbus.c (test_watch_with_poll): New test, checks that
a poll and a watch at the same time get the same messages.

ChangeLog
check/gst/gstbin.c
check/gst/gstbus.c
gst/gstbus.c
tests/check/gst/gstbin.c
tests/check/gst/gstbus.c

index 631192f..4ed2e13 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,15 @@
+2005-11-10  Andy Wingo  <wingo@pobox.com>
+
+       * check/gst/gstbin.c (test_message_state_changed_children): Style
+       fix..
+
+       * gst/gstbus.c (poll_destroy, poll_func, gst_bus_poll): Implement
+       gst_bus_poll with the signal watch. Ensures that poll and a signal
+       watch see the same messages.
+
+       * check/gst/gstbus.c (test_watch_with_poll): New test, checks that
+       a poll and a watch at the same time get the same messages.
+
 2005-11-10  Thomas Vander Stichele  <thomas at apestaart dot org>
 
        * gst/base/gstbasetransform.c: (gst_base_transform_transform_caps):
index 46b1fdd..a71a4d8 100644 (file)
@@ -252,7 +252,7 @@ GST_START_TEST (test_message_state_changed_children)
   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 2);
 
   pop_messages (bus, 3);
-  fail_if ((gst_bus_pop (bus)) != NULL);
+  fail_if (gst_bus_have_pending (bus), "unexpected pending messages");
 
   ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
   ASSERT_OBJECT_REFCOUNT (src, "src", 1);
index 9729b61..ebe9fe1 100644 (file)
@@ -177,6 +177,53 @@ GST_START_TEST (test_watch)
 
   gst_object_unref ((GstObject *) test_bus);
 }
+GST_END_TEST static gint messages_seen = 0;
+
+static void
+message_func (GstBus * bus, GstMessage * message, gpointer data)
+{
+  g_return_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION);
+
+  messages_seen++;
+}
+
+static void
+send_10_app_messages (void)
+{
+  GstMessage *m;
+  GstStructure *s;
+  gint i;
+
+  for (i = 0; i < 10; i++) {
+    s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+    m = gst_message_new_application (NULL, s);
+    gst_bus_post (test_bus, m);
+  }
+}
+
+/* test that you get the same messages from a poll as from signal watches. */
+GST_START_TEST (test_watch_with_poll)
+{
+  guint i;
+
+  test_bus = gst_bus_new ();
+
+  gst_bus_add_signal_watch (test_bus);
+  g_signal_connect (test_bus, "message", (GCallback) message_func, NULL);
+
+  send_10_app_messages ();
+
+  for (i = 0; i < 10; i++)
+    gst_message_unref (gst_bus_poll (test_bus, GST_MESSAGE_APPLICATION,
+            GST_CLOCK_TIME_NONE));
+
+  fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
+  fail_unless (messages_seen == 10, "signal handler didn't get 10 messages");
+
+  gst_bus_remove_signal_watch (test_bus);
+
+  gst_object_unref (test_bus);
+}
 GST_END_TEST Suite * gstbus_suite (void)
 {
   Suite *s = suite_create ("GstBus");
@@ -187,6 +234,7 @@ GST_END_TEST Suite * gstbus_suite (void)
   suite_add_tcase (s, tc_chain);
   tcase_add_test (tc_chain, test_hammer_bus);
   tcase_add_test (tc_chain, test_watch);
+  tcase_add_test (tc_chain, test_watch_with_poll);
   return s;
 }
 
index dcc1d4b..6b43a2d 100644 (file)
@@ -731,25 +731,21 @@ typedef struct
   GstMessage *message;
 } GstBusPollData;
 
-static gboolean
+static void
 poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
 {
   if (!g_main_loop_is_running (poll_data->loop)) {
     GST_DEBUG ("mainloop %p not running", poll_data->loop);
-    return TRUE;
+    return;
   }
 
   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
-    g_return_val_if_fail (poll_data->message == NULL, FALSE);
+    g_return_if_fail (poll_data->message == NULL);
     /* keep ref to message */
     poll_data->message = gst_message_ref (message);
     GST_DEBUG ("mainloop %p quit", poll_data->loop);
     g_main_loop_quit (poll_data->loop);
   }
-
-  /* we always keep the source alive so that we don't accidentialy
-   * free the poll_data */
-  return TRUE;
 }
 
 static gboolean
@@ -764,7 +760,7 @@ poll_timeout (GstBusPollData * poll_data)
 }
 
 static void
-poll_destroy (GstBusPollData * poll_data)
+poll_destroy (GstBusPollData * poll_data, gpointer unused)
 {
   poll_data->source_running = FALSE;
   if (!poll_data->timeout_id) {
@@ -796,7 +792,14 @@ poll_destroy_timeout (GstBusPollData * poll_data)
  *
  * All messages not in @events will be popped off the bus and will be ignored.
  *
- * This function will enter the default mainloop while polling.
+ * Because poll is implemented using the "message" signal enabled by
+ * gst_bus_add_signal_watch(), calling gst_bus_poll() will cause the "message"
+ * signal to be emitted for every message that poll sees. Thus a "message"
+ * signal handler will see the same messages that this function sees -- neither
+ * will steal messages from the other.
+ *
+ * This function will run a main loop from the default main context when
+ * polling.
  *
  * Returns: The message that was received, or NULL if the poll timed out.
  * The message is taken from the bus and needs to be unreffed after usage.
@@ -806,7 +809,7 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
 {
   GstBusPollData *poll_data;
   GstMessage *ret;
-  guint id;
+  gulong id;
 
   poll_data = g_new0 (GstBusPollData, 1);
   poll_data->source_running = TRUE;
@@ -821,20 +824,26 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
   else
     poll_data->timeout_id = 0;
 
-  id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT,
-      (GstBusFunc) poll_func, poll_data, (GDestroyNotify) poll_destroy);
+  id = g_signal_connect_data (bus, "message", G_CALLBACK (poll_func), poll_data,
+      (GClosureNotify) poll_destroy, 0);
+
+  /* these can be nested, so it's ok */
+  gst_bus_add_signal_watch (bus);
 
   GST_DEBUG ("running mainloop %p", poll_data->loop);
   g_main_loop_run (poll_data->loop);
   GST_DEBUG ("mainloop stopped %p", poll_data->loop);
+
+  gst_bus_remove_signal_watch (bus);
+
   /* holds a ref */
   ret = poll_data->message;
 
   if (poll_data->timeout_id)
     g_source_remove (poll_data->timeout_id);
 
-  /* poll_data may get destroyed at any time now */
-  g_source_remove (id);
+  /* poll_data will be freed now */
+  g_signal_handler_disconnect (bus, id);
 
   GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
 
index 46b1fdd..a71a4d8 100644 (file)
@@ -252,7 +252,7 @@ GST_START_TEST (test_message_state_changed_children)
   ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 2);
 
   pop_messages (bus, 3);
-  fail_if ((gst_bus_pop (bus)) != NULL);
+  fail_if (gst_bus_have_pending (bus), "unexpected pending messages");
 
   ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
   ASSERT_OBJECT_REFCOUNT (src, "src", 1);
index 9729b61..ebe9fe1 100644 (file)
@@ -177,6 +177,53 @@ GST_START_TEST (test_watch)
 
   gst_object_unref ((GstObject *) test_bus);
 }
+GST_END_TEST static gint messages_seen = 0;
+
+static void
+message_func (GstBus * bus, GstMessage * message, gpointer data)
+{
+  g_return_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION);
+
+  messages_seen++;
+}
+
+static void
+send_10_app_messages (void)
+{
+  GstMessage *m;
+  GstStructure *s;
+  gint i;
+
+  for (i = 0; i < 10; i++) {
+    s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
+    m = gst_message_new_application (NULL, s);
+    gst_bus_post (test_bus, m);
+  }
+}
+
+/* test that you get the same messages from a poll as from signal watches. */
+GST_START_TEST (test_watch_with_poll)
+{
+  guint i;
+
+  test_bus = gst_bus_new ();
+
+  gst_bus_add_signal_watch (test_bus);
+  g_signal_connect (test_bus, "message", (GCallback) message_func, NULL);
+
+  send_10_app_messages ();
+
+  for (i = 0; i < 10; i++)
+    gst_message_unref (gst_bus_poll (test_bus, GST_MESSAGE_APPLICATION,
+            GST_CLOCK_TIME_NONE));
+
+  fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
+  fail_unless (messages_seen == 10, "signal handler didn't get 10 messages");
+
+  gst_bus_remove_signal_watch (test_bus);
+
+  gst_object_unref (test_bus);
+}
 GST_END_TEST Suite * gstbus_suite (void)
 {
   Suite *s = suite_create ("GstBus");
@@ -187,6 +234,7 @@ GST_END_TEST Suite * gstbus_suite (void)
   suite_add_tcase (s, tc_chain);
   tcase_add_test (tc_chain, test_hammer_bus);
   tcase_add_test (tc_chain, test_watch);
+  tcase_add_test (tc_chain, test_watch_with_poll);
   return s;
 }