gst/gstbus.c: gst_bus_poll may be called from other threads. Handle this nicely by...
authorJan Schmidt <thaytan@mad.scientist.com>
Fri, 29 Jul 2005 15:34:52 +0000 (15:34 +0000)
committerJan Schmidt <thaytan@mad.scientist.com>
Fri, 29 Jul 2005 15:34:52 +0000 (15:34 +0000)
Original commit message from CVS:
* gst/gstbus.c: (gst_bus_set_flushing), (gst_bus_pop),
(gst_bus_peek), (gst_bus_source_dispatch),
(gst_bus_add_watch_full), (poll_handler), (poll_timeout),
(poll_destroy), (poll_destroy_timeout), (gst_bus_poll):
gst_bus_poll may be called from other threads. Handle
this nicely by not making poll_data disappear off the
stack once gst_bus_poll returns.
gst_bus_peek now increments the refcount on the returned
message.

ChangeLog
gst/gstbus.c

index f595ffe..cfff567 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,15 @@
+2005-07-29  Jan Schmidt  <thaytan@mad.scientist.com>
+
+       * gst/gstbus.c: (gst_bus_set_flushing), (gst_bus_pop),
+       (gst_bus_peek), (gst_bus_source_dispatch),
+       (gst_bus_add_watch_full), (poll_handler), (poll_timeout),
+       (poll_destroy), (poll_destroy_timeout), (gst_bus_poll):
+         gst_bus_poll may be called from other threads. Handle
+         this nicely by not making poll_data disappear off the
+         stack once gst_bus_poll returns.
+         gst_bus_peek now increments the refcount on the returned
+         message.
+
 2005-07-29  Wim Taymans  <wim@fluendo.com>
 
        * docs/design/part-gstghostpad.txt:
index 3966416..9551fd5 100644 (file)
@@ -306,12 +306,12 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing)
   if (flushing) {
     GST_FLAG_SET (bus, GST_BUS_FLUSHING);
 
-    GST_DEBUG ("set bus flushing");
+    GST_DEBUG_OBJECT (bus, "set bus flushing");
 
     while ((message = gst_bus_pop (bus)))
       gst_message_unref (message);
   } else {
-    GST_DEBUG ("unset bus flushing");
+    GST_DEBUG_OBJECT (bus, "unset bus flushing");
     GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
   }
 
@@ -339,7 +339,7 @@ gst_bus_pop (GstBus * bus)
   message = g_queue_pop_head (bus->queue);
   g_mutex_unlock (bus->queue_lock);
 
-  GST_DEBUG ("pop on bus, got message %p", message);
+  GST_DEBUG_OBJECT (bus, "pop on bus, got message %p", message);
 
   return message;
 }
@@ -348,15 +348,14 @@ gst_bus_pop (GstBus * bus)
  * gst_bus_peek:
  * @bus: a #GstBus
  *
- * Peek the message on the top of the bus' queue. The bus maintains ownership of
- * the message, and the message will remain on the bus' message queue.
+ * Peek the message on the top of the bus' queue. The message will remain 
+ * on the bus' message queue. A reference is returned, and needs to be freed
+ * by the caller.
  *
  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
  *
  * MT safe.
  */
-/* FIXME, dangerous as the bus could be set to flushing while the app holds
- * a ref to the message */
 GstMessage *
 gst_bus_peek (GstBus * bus)
 {
@@ -366,9 +365,11 @@ gst_bus_peek (GstBus * bus)
 
   g_mutex_lock (bus->queue_lock);
   message = g_queue_peek_head (bus->queue);
+  if (message)
+    gst_message_ref (message);
   g_mutex_unlock (bus->queue_lock);
 
-  GST_DEBUG ("peek on bus, got message %p", message);
+  GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
 
   return message;
 }
@@ -433,18 +434,19 @@ gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
 
   message = gst_bus_peek (bus);
 
-  GST_DEBUG ("have message %p", message);
+  GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message);
 
   g_return_val_if_fail (message != NULL, TRUE);
 
   if (!handler)
     goto no_handler;
 
-  GST_DEBUG ("calling dispatch with %p", message);
+  GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
 
   needs_pop = handler (bus, message, user_data);
+  gst_message_unref (message);
 
-  GST_DEBUG ("handler returns %d", needs_pop);
+  GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop);
   if (needs_pop) {
     message = gst_bus_pop (bus);
     if (message) {
@@ -463,6 +465,7 @@ no_handler:
   {
     g_warning ("GstBus watch dispatched without callback\n"
         "You must call g_source_connect().");
+    gst_message_unref (message);
     return FALSE;
   }
 }
@@ -542,6 +545,7 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
   id = g_source_attach (source, NULL);
   g_source_unref (source);
 
+  GST_DEBUG_OBJECT (bus, "New source %p\n", source);
   return id;
 }
 
@@ -568,6 +572,7 @@ typedef struct
 {
   GMainLoop *loop;
   guint timeout_id;
+  gboolean source_running;
   GstMessageType events;
   GstMessageType revent;
 } GstBusPollData;
@@ -575,10 +580,13 @@ typedef struct
 static gboolean
 poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
 {
+  if (!g_main_loop_is_running (poll_data->loop))
+    return FALSE;
+
   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
     poll_data->revent = GST_MESSAGE_TYPE (message);
-    if (g_main_loop_is_running (poll_data->loop))
-      g_main_loop_quit (poll_data->loop);
+    g_main_loop_quit (poll_data->loop);
+
     /* keep the message on the queue */
     return FALSE;
   } else {
@@ -590,12 +598,32 @@ poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
 static gboolean
 poll_timeout (GstBusPollData * poll_data)
 {
-  poll_data->timeout_id = 0;
   g_main_loop_quit (poll_data->loop);
+
   /* returning FALSE will remove the source id */
   return FALSE;
 }
 
+static void
+poll_destroy (GstBusPollData * poll_data)
+{
+  poll_data->source_running = FALSE;
+  if (!poll_data->timeout_id) {
+    g_main_loop_unref (poll_data->loop);
+    g_free (poll_data);
+  }
+}
+
+static void
+poll_destroy_timeout (GstBusPollData * poll_data)
+{
+  poll_data->timeout_id = 0;
+  if (!poll_data->source_running) {
+    g_main_loop_unref (poll_data->loop);
+    g_free (poll_data);
+  }
+}
+
 /**
  * gst_bus_poll:
  * @bus: a #GstBus
@@ -616,29 +644,37 @@ poll_timeout (GstBusPollData * poll_data)
 GstMessageType
 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
 {
-  GstBusPollData poll_data;
+  GstBusPollData *poll_data;
   GstMessageType ret;
   guint id;
 
+  poll_data = g_new0 (GstBusPollData, 1);
+  g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN);
+
+  poll_data->source_running = TRUE;
+  poll_data->loop = g_main_loop_new (NULL, FALSE);
+  poll_data->events = events;
+  poll_data->revent = GST_MESSAGE_UNKNOWN;
+
   if (timeout >= 0)
-    poll_data.timeout_id = g_timeout_add (timeout / GST_MSECOND,
-        (GSourceFunc) poll_timeout, &poll_data);
+    poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
+        timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
+        (GDestroyNotify) poll_destroy_timeout);
   else
-    poll_data.timeout_id = 0;
+    poll_data->timeout_id = 0;
 
-  poll_data.loop = g_main_loop_new (NULL, FALSE);
-  poll_data.events = events;
-  poll_data.revent = GST_MESSAGE_UNKNOWN;
+  id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE,
+      (GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy);
+  g_main_loop_run (poll_data->loop);
+  ret = poll_data->revent;
 
-  id = gst_bus_add_watch (bus, (GstBusHandler) poll_handler, &poll_data);
-  g_main_loop_run (poll_data.loop);
-  g_source_remove (id);
+  if (poll_data->timeout_id)
+    g_source_remove (poll_data->timeout_id);
 
-  ret = poll_data.revent;
+  /* poll_data may get destroyed at any time now */
+  g_source_remove (id);
 
-  if (poll_data.timeout_id)
-    g_source_remove (poll_data.timeout_id);
-  g_main_loop_unref (poll_data.loop);
+  GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret);
 
   return ret;
 }