bus: Ensure that only one GSource can be attached to the bus
authorThibault Saunier <tsaunier@igalia.com>
Thu, 10 Dec 2020 18:48:32 +0000 (15:48 -0300)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 11 Dec 2020 12:29:50 +0000 (12:29 +0000)
Until now we were enforcing that only 1 signal GSource was attached
the bus but we could attach as many GSource with `gst_bus_create_watch`
as we wanted... but in the end only 1 GSource will ever be dispatched for
a given `GstMessage` leading to totally broken behavior.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/718>

gst/gstbus.c
tests/check/gst/gstbus.c

index 5fea8b7..589184f 100644 (file)
@@ -147,7 +147,7 @@ struct _GstBusPrivate
   guint num_signal_watchers;
 
   guint num_sync_message_emitters;
-  GSource *signal_watch;
+  GSource *gsource;
 
   gboolean enable_async;
   GstPoll *poll;
@@ -869,8 +869,8 @@ gst_bus_source_dispose (GSource * source)
   GST_DEBUG_OBJECT (bus, "disposing source %p", source);
 
   GST_OBJECT_LOCK (bus);
-  if (bus->priv->signal_watch == source)
-    bus->priv->signal_watch = NULL;
+  if (bus->priv->gsource == source)
+    bus->priv->gsource = NULL;
   GST_OBJECT_UNLOCK (bus);
 }
 #endif
@@ -885,13 +885,12 @@ gst_bus_source_finalize (GSource * source)
   GST_DEBUG_OBJECT (bus, "finalize source %p", source);
 
   GST_OBJECT_LOCK (bus);
-  if (bus->priv->signal_watch == source)
-    bus->priv->signal_watch = NULL;
+  if (bus->priv->gsource == source)
+    bus->priv->gsource = NULL;
   GST_OBJECT_UNLOCK (bus);
 #endif
 
-  gst_object_unref (bsource->bus);
-  bsource->bus = NULL;
+  gst_clear_object (&bsource->bus);
 }
 
 static GSourceFuncs gst_bus_source_funcs = {
@@ -901,6 +900,33 @@ static GSourceFuncs gst_bus_source_funcs = {
   gst_bus_source_finalize
 };
 
+
+static GSource *
+gst_bus_create_watch_unlocked (GstBus * bus)
+{
+  GstBusSource *source;
+
+  if (bus->priv->gsource) {
+    GST_ERROR_OBJECT (bus,
+        "Tried to add new GSource while one was already there");
+    return NULL;
+  }
+
+  bus->priv->gsource = g_source_new (&gst_bus_source_funcs,
+      sizeof (GstBusSource));
+  source = (GstBusSource *) bus->priv->gsource;
+
+  g_source_set_name ((GSource *) source, "GStreamer message bus watch");
+#if GLIB_CHECK_VERSION(2,63,3)
+  g_source_set_dispose_function ((GSource *) source, gst_bus_source_dispose);
+#endif
+
+  source->bus = gst_object_ref (bus);
+  g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
+
+  return (GSource *) source;
+}
+
 /**
  * gst_bus_create_watch:
  * @bus: a #GstBus to create the watch for
@@ -909,28 +935,24 @@ static GSourceFuncs gst_bus_source_funcs = {
  * a message is on the bus. After the GSource is dispatched, the
  * message is popped off the bus and unreffed.
  *
+ * As with other watches, there can only be one watch on the bus, including
+ * any signal watch added with #gst_bus_add_signal_watch.
+ *
  * Returns: (transfer full) (nullable): a #GSource that can be added to a mainloop.
  */
 GSource *
 gst_bus_create_watch (GstBus * bus)
 {
-  GstBusSource *source;
+  GSource *source;
 
   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
   g_return_val_if_fail (bus->priv->poll != NULL, NULL);
 
-  source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
-      sizeof (GstBusSource));
-
-  g_source_set_name ((GSource *) source, "GStreamer message bus watch");
-#if GLIB_CHECK_VERSION(2,63,3)
-  g_source_set_dispose_function ((GSource *) source, gst_bus_source_dispose);
-#endif
-
-  source->bus = gst_object_ref (bus);
-  g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
+  GST_OBJECT_LOCK (bus);
+  source = gst_bus_create_watch_unlocked (bus);
+  GST_OBJECT_UNLOCK (bus);
 
-  return (GSource *) source;
+  return source;
 }
 
 /* must be called with the bus OBJECT LOCK */
@@ -942,13 +964,13 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
   guint id;
   GSource *source;
 
-  if (bus->priv->signal_watch) {
+  if (bus->priv->gsource) {
     GST_ERROR_OBJECT (bus,
         "Tried to add new watch while one was already there");
     return 0;
   }
 
-  source = gst_bus_create_watch (bus);
+  source = gst_bus_create_watch_unlocked (bus);
   if (!source) {
     g_critical ("Creating bus watch failed");
     return 0;
@@ -964,7 +986,7 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
   g_source_unref (source);
 
   if (id) {
-    bus->priv->signal_watch = source;
+    bus->priv->gsource = source;
   }
 
   GST_DEBUG_OBJECT (bus, "New source %p with id %u", source, id);
@@ -1076,7 +1098,7 @@ gst_bus_remove_watch (GstBus * bus)
 
   GST_OBJECT_LOCK (bus);
 
-  if (bus->priv->signal_watch == NULL) {
+  if (bus->priv->gsource == NULL) {
     GST_ERROR_OBJECT (bus, "no bus watch was present");
     goto error;
   }
@@ -1087,9 +1109,8 @@ gst_bus_remove_watch (GstBus * bus)
     goto error;
   }
 
-  source =
-      bus->priv->signal_watch ? g_source_ref (bus->priv->signal_watch) : NULL;
-  bus->priv->signal_watch = NULL;
+  source = g_source_ref (bus->priv->gsource);
+  bus->priv->gsource = NULL;
 
   GST_OBJECT_UNLOCK (bus);
 
@@ -1411,13 +1432,13 @@ gst_bus_add_signal_watch_full (GstBus * bus, gint priority)
   if (bus->priv->num_signal_watchers > 0)
     goto done;
 
-  /* this should not fail because the counter above takes care of it */
-  g_assert (!bus->priv->signal_watch);
+  if (bus->priv->gsource)
+    goto has_gsource;
 
   gst_bus_add_watch_full_unlocked (bus, priority, gst_bus_async_signal_func,
       NULL, NULL);
 
-  if (G_UNLIKELY (!bus->priv->signal_watch))
+  if (G_UNLIKELY (!bus->priv->gsource))
     goto add_failed;
 
 done:
@@ -1434,6 +1455,12 @@ add_failed:
     GST_OBJECT_UNLOCK (bus);
     return;
   }
+has_gsource:
+  {
+    g_critical ("Bus %s already has a GSource watch", GST_OBJECT_NAME (bus));
+    GST_OBJECT_UNLOCK (bus);
+    return;
+  }
 }
 
 /**
@@ -1487,12 +1514,12 @@ gst_bus_remove_signal_watch (GstBus * bus)
   if (bus->priv->num_signal_watchers > 0)
     goto done;
 
-  GST_DEBUG_OBJECT (bus, "removing signal watch %u",
-      g_source_get_id (bus->priv->signal_watch));
+  GST_DEBUG_OBJECT (bus, "removing gsource %u",
+      g_source_get_id (bus->priv->gsource));
 
-  source =
-      bus->priv->signal_watch ? g_source_ref (bus->priv->signal_watch) : NULL;
-  bus->priv->signal_watch = NULL;
+  g_assert (bus->priv->gsource);
+  source = g_source_ref (bus->priv->gsource);
+  bus->priv->gsource = NULL;
 
 done:
   GST_OBJECT_UNLOCK (bus);
index 78aa0f1..578e9d9 100644 (file)
@@ -900,6 +900,32 @@ GST_START_TEST (test_async_message)
 
 GST_END_TEST;
 
+GST_START_TEST (test_single_gsource)
+{
+  GstBus *bus = gst_bus_new ();
+  GSource *source = gst_bus_create_watch (bus);
+  g_source_attach (source, NULL);
+  g_source_unref (source);
+
+  source = gst_bus_create_watch (bus);
+  fail_if (source, "Only one GSource can be added to a bus");
+
+  ASSERT_CRITICAL (gst_bus_add_signal_watch (bus));
+  ASSERT_CRITICAL (gst_bus_remove_signal_watch (bus));
+
+  fail_unless (gst_bus_remove_watch (bus), "Could not remove watch");
+  gst_bus_add_signal_watch (bus);
+
+  fail_if (gst_bus_remove_watch (bus), "Signal watch should be removed"
+      " with gst_bus_remove_signal_watch");
+
+  gst_bus_remove_signal_watch (bus);
+
+  gst_object_unref (bus);
+}
+
+GST_END_TEST;
+
 static Suite *
 gst_bus_suite (void)
 {
@@ -922,6 +948,7 @@ gst_bus_suite (void)
   tcase_add_test (tc_chain, test_timed_pop_filtered_with_timeout);
   tcase_add_test (tc_chain, test_custom_main_context);
   tcase_add_test (tc_chain, test_async_message);
+  tcase_add_test (tc_chain, test_single_gsource);
   return s;
 }