gstfunnel: avoid access of freed pad
[platform/upstream/gstreamer.git] / gst / gstbus.c
index e3a15f7..5e663ca 100644 (file)
@@ -64,7 +64,7 @@
  * Note that a #GstPipeline will set its bus into flushing state when changing
  * from READY to NULL state.
  *
- * Last reviewed on 2006-03-12 (0.10.5)
+ * Last reviewed on 2012-03-28 (0.11.3)
  */
 
 #include "gst_private.h"
 #endif
 #include <sys/types.h>
 
+#include "gstatomicqueue.h"
 #include "gstinfo.h"
 #include "gstpoll.h"
 
 #include "gstbus.h"
+#include "glib-compat-private.h"
 
 #define GST_CAT_DEFAULT GST_CAT_BUS
 /* bus signals */
@@ -99,11 +101,19 @@ enum
 
 static void gst_bus_dispose (GObject * object);
 
-static GstObjectClass *parent_class = NULL;
 static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
 
 struct _GstBusPrivate
 {
+  GstAtomicQueue *queue;
+  GMutex queue_lock;
+
+  GstBusSyncHandler sync_handler;
+  gpointer sync_handler_data;
+
+  guint signal_watch_id;
+  guint num_signal_watchers;
+
   guint num_sync_message_emitters;
   GSource *watch_id;
 
@@ -112,6 +122,7 @@ struct _GstBusPrivate
   GPollFD pollfd;
 };
 
+#define gst_bus_parent_class parent_class
 G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
 
 static void
@@ -146,8 +157,6 @@ gst_bus_class_init (GstBusClass * klass)
 {
   GObjectClass *gobject_class = (GObjectClass *) klass;
 
-  parent_class = g_type_class_peek_parent (klass);
-
   gobject_class->dispose = gst_bus_dispose;
   gobject_class->set_property = gst_bus_set_property;
   gobject_class->constructed = gst_bus_constructed;
@@ -188,7 +197,7 @@ gst_bus_class_init (GstBusClass * klass)
       g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
       G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
-      g_cclosure_marshal_VOID__BOXED, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
+      g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
 
   /**
    * GstBus::message:
@@ -203,7 +212,7 @@ gst_bus_class_init (GstBusClass * klass)
       g_signal_new ("message", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
       G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
-      g_cclosure_marshal_VOID__BOXED, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
+      g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
 
   g_type_class_add_private (klass, sizeof (GstBusPrivate));
 }
@@ -211,11 +220,10 @@ gst_bus_class_init (GstBusClass * klass)
 static void
 gst_bus_init (GstBus * bus)
 {
-  bus->queue = gst_atomic_queue_new (32);
-  bus->queue_lock = g_mutex_new ();
-
   bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
   bus->priv->enable_async = DEFAULT_ENABLE_ASYNC;
+  g_mutex_init (&bus->priv->queue_lock);
+  bus->priv->queue = gst_atomic_queue_new (32);
 
   GST_DEBUG_OBJECT (bus, "created");
 }
@@ -225,20 +233,19 @@ gst_bus_dispose (GObject * object)
 {
   GstBus *bus = GST_BUS (object);
 
-  if (bus->queue) {
+  if (bus->priv->queue) {
     GstMessage *message;
 
-    g_mutex_lock (bus->queue_lock);
+    g_mutex_lock (&bus->priv->queue_lock);
     do {
-      message = gst_atomic_queue_pop (bus->queue);
+      message = gst_atomic_queue_pop (bus->priv->queue);
       if (message)
         gst_message_unref (message);
     } while (message != NULL);
-    gst_atomic_queue_unref (bus->queue);
-    bus->queue = NULL;
-    g_mutex_unlock (bus->queue_lock);
-    g_mutex_free (bus->queue_lock);
-    bus->queue_lock = NULL;
+    gst_atomic_queue_unref (bus->priv->queue);
+    bus->priv->queue = NULL;
+    g_mutex_unlock (&bus->priv->queue_lock);
+    g_mutex_clear (&bus->priv->queue_lock);
 
     if (bus->priv->poll)
       gst_poll_free (bus->priv->poll);
@@ -297,8 +304,8 @@ gst_bus_post (GstBus * bus, GstMessage * message)
   if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
     goto is_flushing;
 
-  handler = bus->sync_handler;
-  handler_data = bus->sync_handler_data;
+  handler = bus->priv->sync_handler;
+  handler_data = bus->priv->sync_handler_data;
   emit_sync_message = bus->priv->num_sync_message_emitters > 0;
   GST_OBJECT_UNLOCK (bus);
 
@@ -326,7 +333,7 @@ gst_bus_post (GstBus * bus, GstMessage * message)
     case GST_BUS_PASS:
       /* pass the message to the async queue, refcount passed in the queue */
       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
-      gst_atomic_queue_push (bus->queue, message);
+      gst_atomic_queue_push (bus->priv->queue, message);
       gst_poll_write_control (bus->priv->poll);
       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
 
@@ -335,11 +342,11 @@ gst_bus_post (GstBus * bus, GstMessage * message)
     {
       /* async delivery, we need a mutex and a cond to block
        * on */
-      GMutex *lock = g_mutex_new ();
-      GCond *cond = g_cond_new ();
+      GCond *cond = GST_MESSAGE_GET_COND (message);
+      GMutex *lock = GST_MESSAGE_GET_LOCK (message);
 
-      GST_MESSAGE_COND (message) = cond;
-      GST_MESSAGE_GET_LOCK (message) = lock;
+      g_cond_init (cond);
+      g_mutex_init (lock);
 
       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
 
@@ -348,7 +355,7 @@ gst_bus_post (GstBus * bus, GstMessage * message)
        * the cond will be signalled and we can continue */
       g_mutex_lock (lock);
 
-      gst_atomic_queue_push (bus->queue, message);
+      gst_atomic_queue_push (bus->priv->queue, message);
       gst_poll_write_control (bus->priv->poll);
 
       /* now block till the message is freed */
@@ -357,8 +364,8 @@ gst_bus_post (GstBus * bus, GstMessage * message)
 
       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
 
-      g_mutex_free (lock);
-      g_cond_free (cond);
+      g_mutex_clear (lock);
+      g_cond_clear (cond);
       break;
     }
     default:
@@ -398,7 +405,7 @@ gst_bus_have_pending (GstBus * bus)
   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
 
   /* see if there is a message on the bus */
-  result = gst_atomic_queue_length (bus->queue) != 0;
+  result = gst_atomic_queue_length (bus->priv->queue) != 0;
 
   return result;
 }
@@ -472,19 +479,21 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
   g_return_val_if_fail (types != 0, NULL);
   g_return_val_if_fail (timeout == 0 || bus->priv->poll != NULL, NULL);
 
-  g_mutex_lock (bus->queue_lock);
+  g_mutex_lock (&bus->priv->queue_lock);
 
   while (TRUE) {
     gint ret;
 
     GST_LOG_OBJECT (bus, "have %d messages",
-        gst_atomic_queue_length (bus->queue));
+        gst_atomic_queue_length (bus->priv->queue));
 
-    while ((message = gst_atomic_queue_pop (bus->queue))) {
+    while ((message = gst_atomic_queue_pop (bus->priv->queue))) {
       if (bus->priv->poll)
         gst_poll_read_control (bus->priv->poll);
-      GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u",
-          message, GST_MESSAGE_TYPE_NAME (message), (guint) types);
+
+      GST_DEBUG_OBJECT (bus, "got message %p, %s from %s, type mask is %u",
+          message, GST_MESSAGE_TYPE_NAME (message),
+          GST_MESSAGE_SRC_NAME (message), (guint) types);
       if ((GST_MESSAGE_TYPE (message) & types) != 0) {
         /* exit the loop, we have a message */
         goto beach;
@@ -515,9 +524,9 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
 
     /* only here in timeout case */
     g_assert (bus->priv->poll);
-    g_mutex_unlock (bus->queue_lock);
+    g_mutex_unlock (&bus->priv->queue_lock);
     ret = gst_poll_wait (bus->priv->poll, timeout - elapsed);
-    g_mutex_lock (bus->queue_lock);
+    g_mutex_lock (&bus->priv->queue_lock);
 
     if (ret == 0) {
       GST_INFO_OBJECT (bus, "timed out, breaking loop");
@@ -529,7 +538,7 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
 
 beach:
 
-  g_mutex_unlock (bus->queue_lock);
+  g_mutex_unlock (&bus->priv->queue_lock);
 
   return message;
 }
@@ -631,11 +640,11 @@ gst_bus_peek (GstBus * bus)
 
   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
 
-  g_mutex_lock (bus->queue_lock);
-  message = gst_atomic_queue_peek (bus->queue);
+  g_mutex_lock (&bus->priv->queue_lock);
+  message = gst_atomic_queue_peek (bus->priv->queue);
   if (message)
     gst_message_ref (message);
-  g_mutex_unlock (bus->queue_lock);
+  g_mutex_unlock (&bus->priv->queue_lock);
 
   GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
 
@@ -667,11 +676,11 @@ gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
 
   /* Assert if the user attempts to replace an existing sync_handler,
    * other than to clear it */
-  if (func != NULL && bus->sync_handler != NULL)
+  if (func != NULL && bus->priv->sync_handler != NULL)
     goto no_replace;
 
-  bus->sync_handler = func;
-  bus->sync_handler_data = data;
+  bus->priv->sync_handler = func;
+  bus->priv->sync_handler_data = data;
   GST_OBJECT_UNLOCK (bus);
 
   return;
@@ -733,7 +742,8 @@ gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
   if (!handler)
     goto no_handler;
 
-  GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
+  GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %" GST_PTR_FORMAT,
+      source, message);
 
   keep = handler (bus, message, user_data);
   gst_message_unref (message);
@@ -797,6 +807,9 @@ gst_bus_create_watch (GstBus * bus)
 
   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
       sizeof (GstBusSource));
+
+  g_source_set_name ((GSource *) source, "GStreamer message bus watch");
+
   source->bus = gst_object_ref (bus);
   g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
 
@@ -862,7 +875,7 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
  * from @func.
  *
  * Returns: The event source id.
- *
+ * Rename to: gst_bus_add_watch
  * MT safe.
  */
 guint
@@ -881,7 +894,7 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
 }
 
 /**
- * gst_bus_add_watch:
+ * gst_bus_add_watch: (skip)
  * @bus: a #GstBus to create the watch for
  * @func: A function to call when a message is received.
  * @user_data: user data passed to @func.
@@ -978,7 +991,7 @@ poll_destroy_timeout (GstBusPollData * poll_data)
  * @bus: a #GstBus
  * @events: a mask of #GstMessageType, representing the set of message types to
  * poll for.
- * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll
+ * @timeout: the poll timeout, as a #GstClockTime, or #GST_CLOCK_TIME_NONE to poll
  * indefinitely.
  *
  * Poll the bus for messages. Will block while waiting for messages to come.
@@ -1018,7 +1031,7 @@ poll_destroy_timeout (GstBusPollData * poll_data)
  *     unreffed with gst_message_unref() after usage.
  */
 GstMessage *
-gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
+gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTime timeout)
 {
   GstBusPollData *poll_data;
   GstMessage *ret;
@@ -1030,7 +1043,7 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
   poll_data->events = events;
   poll_data->message = NULL;
 
-  if (timeout >= 0)
+  if (timeout != GST_CLOCK_TIME_NONE)
     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
         timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
         (GDestroyNotify) poll_destroy_timeout);
@@ -1169,7 +1182,7 @@ void
 gst_bus_disable_sync_message_emission (GstBus * bus)
 {
   g_return_if_fail (GST_IS_BUS (bus));
-  g_return_if_fail (bus->num_signal_watchers == 0);
+  g_return_if_fail (bus->priv->num_signal_watchers == 0);
 
   GST_OBJECT_LOCK (bus);
   bus->priv->num_sync_message_emitters--;
@@ -1194,8 +1207,8 @@ gst_bus_disable_sync_message_emission (GstBus * bus)
  * responsible for calling gst_bus_remove_signal_watch() as many times as this
  * function is called.
  *
- * There can only be a single bus watch per bus, you most remove all signal watch
- * before you can set another type of watch.
+ * There can only be a single bus watch per bus, you must remove any signal
+ * watch before you can set another type of watch.
  *
  * MT safe.
  */
@@ -1207,22 +1220,22 @@ gst_bus_add_signal_watch_full (GstBus * bus, gint priority)
   /* I know the callees don't take this lock, so go ahead and abuse it */
   GST_OBJECT_LOCK (bus);
 
-  if (bus->num_signal_watchers > 0)
+  if (bus->priv->num_signal_watchers > 0)
     goto done;
 
   /* this should not fail because the counter above takes care of it */
-  g_assert (bus->signal_watch_id == 0);
+  g_assert (bus->priv->signal_watch_id == 0);
 
-  bus->signal_watch_id =
+  bus->priv->signal_watch_id =
       gst_bus_add_watch_full_unlocked (bus, priority, gst_bus_async_signal_func,
       NULL, NULL);
 
-  if (G_UNLIKELY (bus->signal_watch_id == 0))
+  if (G_UNLIKELY (bus->priv->signal_watch_id == 0))
     goto add_failed;
 
 done:
 
-  bus->num_signal_watchers++;
+  bus->priv->num_signal_watchers++;
 
   GST_OBJECT_UNLOCK (bus);
   return;
@@ -1279,16 +1292,16 @@ gst_bus_remove_signal_watch (GstBus * bus)
   /* I know the callees don't take this lock, so go ahead and abuse it */
   GST_OBJECT_LOCK (bus);
 
-  if (bus->num_signal_watchers == 0)
+  if (bus->priv->num_signal_watchers == 0)
     goto error;
 
-  bus->num_signal_watchers--;
+  bus->priv->num_signal_watchers--;
 
-  if (bus->num_signal_watchers > 0)
+  if (bus->priv->num_signal_watchers > 0)
     goto done;
 
-  id = bus->signal_watch_id;
-  bus->signal_watch_id = 0;
+  id = bus->priv->signal_watch_id;
+  bus->priv->signal_watch_id = 0;
 
   GST_DEBUG_OBJECT (bus, "removing signal watch %u", id);