message, bus: fix async message delivery
authorTim-Philipp Müller <tim@centricular.com>
Mon, 16 Feb 2015 22:39:42 +0000 (22:39 +0000)
committerTim-Philipp Müller <tim@centricular.com>
Tue, 17 Feb 2015 09:52:09 +0000 (09:52 +0000)
Async message delivery (where the posting thread gets blocked
until the message has been processed and/or freed) was pretty
much completely broken.

For one, don't use GMutex implementation details to check
whether a mutex has been initialized or not, esp. not
implementation details that don't hold true any more with
newer GLib versions where atomic ops and futexes are used
(spotted by Josep Torras). This led to async message
delivery no longer blocking with newer GLib versions on
Linux.

Secondly, after async delivery don't free mutex/GCond
embedded inside the just-freed message structure.

Use a new (private) mini object flag to signal GstMessage
that the message being freed is part of an async delivery
on the bus so that the dispose handler can keep the message
alive and the bus can free it once it's done cleaning up
stuff.

gst/gst_private.h
gst/gstbus.c
gst/gstmessage.c

index 8c9cb4b..d0aefb4 100644 (file)
@@ -426,5 +426,8 @@ struct _GstDeviceProviderFactoryClass {
   gpointer _gst_reserved[GST_PADDING];
 };
 
+/* privat flag used by GstBus / GstMessage */
+#define GST_MESSAGE_FLAG_ASYNC_DELIVERY (GST_MINI_OBJECT_FLAG_LAST << 0)
+
 G_END_DECLS
 #endif /* __GST_PRIVATE_H__ */
index ff458cd..13b0546 100644 (file)
@@ -308,6 +308,10 @@ gst_bus_post (GstBus * bus, GstMessage * message)
   GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus %" GST_PTR_FORMAT, message,
       message);
 
+  /* check we didn't accidentally add a public flag that maps to same value */
+  g_assert (!GST_MINI_OBJECT_FLAG_IS_SET (message,
+          GST_MESSAGE_FLAG_ASYNC_DELIVERY));
+
   GST_OBJECT_LOCK (bus);
   /* check if the bus is flushing */
   if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
@@ -357,6 +361,8 @@ gst_bus_post (GstBus * bus, GstMessage * message)
       g_cond_init (cond);
       g_mutex_init (lock);
 
+      GST_MINI_OBJECT_FLAG_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY);
+
       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
 
       /* now we lock the message mutex, send the message to the async
@@ -369,12 +375,18 @@ gst_bus_post (GstBus * bus, GstMessage * message)
 
       /* now block till the message is freed */
       g_cond_wait (cond, lock);
+
+      /* we acquired a new ref from gst_message_dispose() so we can clean up */
       g_mutex_unlock (lock);
 
       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
 
+      GST_MINI_OBJECT_FLAG_UNSET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY);
+
       g_mutex_clear (lock);
       g_cond_clear (cond);
+
+      gst_message_unref (message);
       break;
     }
     default:
index 8d3892c..9f4d286 100644 (file)
@@ -166,6 +166,26 @@ gst_message_type_to_quark (GstMessageType type)
   return 0;
 }
 
+static gboolean
+_gst_message_dispose (GstMessage * message)
+{
+  gboolean do_free = TRUE;
+
+  if (GST_MINI_OBJECT_FLAG_IS_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY)) {
+    GST_INFO ("[msg %p] signalling async free", message);
+
+    GST_MESSAGE_LOCK (message);
+    GST_MESSAGE_SIGNAL (message);
+    GST_MESSAGE_UNLOCK (message);
+
+    /* don't free it yet, let bus finish with it first */
+    gst_message_ref (message);
+    do_free = FALSE;
+  }
+
+  return do_free;
+}
+
 static void
 _gst_message_free (GstMessage * message)
 {
@@ -181,12 +201,6 @@ _gst_message_free (GstMessage * message)
     GST_MESSAGE_SRC (message) = NULL;
   }
 
-  if (message->lock.p) {
-    GST_MESSAGE_LOCK (message);
-    GST_MESSAGE_SIGNAL (message);
-    GST_MESSAGE_UNLOCK (message);
-  }
-
   structure = GST_MESSAGE_STRUCTURE (message);
   if (structure) {
     gst_structure_set_parent_refcount (structure, NULL);
@@ -235,7 +249,8 @@ gst_message_init (GstMessageImpl * message, GstMessageType type,
     GstObject * src)
 {
   gst_mini_object_init (GST_MINI_OBJECT_CAST (message), 0, _gst_message_type,
-      (GstMiniObjectCopyFunction) _gst_message_copy, NULL,
+      (GstMiniObjectCopyFunction) _gst_message_copy,
+      (GstMiniObjectDisposeFunction) _gst_message_dispose,
       (GstMiniObjectFreeFunction) _gst_message_free);
 
   GST_MESSAGE_TYPE (message) = type;