docs: convert NULL, TRUE, and FALSE to %NULL, %TRUE, and %FALSE
[platform/upstream/gstreamer.git] / gst / gstbus.c
index b80420b..69cf6b9 100644 (file)
@@ -15,8 +15,8 @@
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
@@ -63,8 +63,6 @@
  *
  * Note that a #GstPipeline will set its bus into flushing state when changing
  * from READY to NULL state.
- *
- * Last reviewed on 2006-03-12 (0.10.5)
  */
 
 #include "gst_private.h"
@@ -74,7 +72,9 @@
 #endif
 #include <sys/types.h>
 
+#include "gstatomicqueue.h"
 #include "gstinfo.h"
+#include "gstpoll.h"
 
 #include "gstbus.h"
 #include "glib-compat-private.h"
@@ -89,49 +89,67 @@ enum
   LAST_SIGNAL
 };
 
-static void gst_bus_dispose (GObject * object);
+#define DEFAULT_ENABLE_ASYNC (TRUE)
 
-static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx);
+enum
+{
+  PROP_0,
+  PROP_ENABLE_ASYNC
+};
+
+static void gst_bus_dispose (GObject * object);
+static void gst_bus_finalize (GObject * object);
 
-static GstObjectClass *parent_class = NULL;
 static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
 
 struct _GstBusPrivate
 {
+  GstAtomicQueue *queue;
+  GMutex queue_lock;
+
+  GstBusSyncHandler sync_handler;
+  gpointer sync_handler_data;
+  GDestroyNotify sync_handler_notify;
+
+  guint signal_watch_id;
+  guint num_signal_watchers;
+
   guint num_sync_message_emitters;
-  GCond *queue_cond;
   GSource *watch_id;
-  GMainContext *main_context;
+
+  gboolean enable_async;
+  GstPoll *poll;
+  GPollFD pollfd;
 };
 
+#define gst_bus_parent_class parent_class
 G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
 
-/* fixme: do something about this */
 static void
-marshal_VOID__MINIOBJECT (GClosure * closure, GValue * return_value,
-    guint n_param_values, const GValue * param_values, gpointer invocation_hint,
-    gpointer marshal_data)
+gst_bus_set_property (GObject * object,
+    guint prop_id, const GValue * value, GParamSpec * pspec)
 {
-  typedef void (*marshalfunc_VOID__MINIOBJECT) (gpointer obj, gpointer arg1,
-      gpointer data2);
-  register marshalfunc_VOID__MINIOBJECT callback;
-  register GCClosure *cc = (GCClosure *) closure;
-  register gpointer data1, data2;
+  GstBus *bus = GST_BUS_CAST (object);
 
-  g_return_if_fail (n_param_values == 2);
-
-  if (G_CCLOSURE_SWAP_DATA (closure)) {
-    data1 = closure->data;
-    data2 = g_value_peek_pointer (param_values + 0);
-  } else {
-    data1 = g_value_peek_pointer (param_values + 0);
-    data2 = closure->data;
+  switch (prop_id) {
+    case PROP_ENABLE_ASYNC:
+      bus->priv->enable_async = g_value_get_boolean (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
   }
-  callback =
-      (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data :
-      cc->callback);
+}
 
-  callback (data1, gst_value_get_mini_object (param_values + 1), data2);
+static void
+gst_bus_constructed (GObject * object)
+{
+  GstBus *bus = GST_BUS_CAST (object);
+
+  if (bus->priv->enable_async) {
+    bus->priv->poll = gst_poll_new_timer ();
+    gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
+  }
 }
 
 static void
@@ -139,9 +157,26 @@ gst_bus_class_init (GstBusClass * klass)
 {
   GObjectClass *gobject_class = (GObjectClass *) klass;
 
-  parent_class = g_type_class_peek_parent (klass);
-
   gobject_class->dispose = gst_bus_dispose;
+  gobject_class->finalize = gst_bus_finalize;
+  gobject_class->set_property = gst_bus_set_property;
+  gobject_class->constructed = gst_bus_constructed;
+
+  /**
+   * GstBus::enable-async:
+   *
+   * Enable async message delivery support for bus watches,
+   * gst_bus_pop() and similar API. Without this only the
+   * synchronous message handlers are called.
+   *
+   * This property is used to create the child element buses
+   * in #GstBin.
+   */
+  g_object_class_install_property (gobject_class, PROP_ENABLE_ASYNC,
+      g_param_spec_boolean ("enable-async", "Enable Async",
+          "Enable async message delivery for bus watches and gst_bus_pop()",
+          DEFAULT_ENABLE_ASYNC,
+          G_PARAM_CONSTRUCT_ONLY | G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
 
   /**
    * GstBus::sync-message:
@@ -151,18 +186,14 @@ gst_bus_class_init (GstBusClass * klass)
    * A message has been posted on the bus. This signal is emitted from the
    * thread that posted the message so one has to be careful with locking.
    *
-   * This signal will not be emitted by default, you have to set up
-   * gst_bus_sync_signal_handler() as a sync handler if you want this
-   * signal to be emitted when a message is posted on the bus, like this:
-   * <programlisting>
-   * gst_bus_set_sync_handler (bus, gst_bus_sync_signal_handler, yourdata);
-   * </programlisting>
+   * This signal will not be emitted by default, you have to call
+   * gst_bus_enable_sync_message_emission() before.
    */
   gst_bus_signals[SYNC_MESSAGE] =
       g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
       G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
-      marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
+      g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
 
   /**
    * GstBus::message:
@@ -177,7 +208,7 @@ gst_bus_class_init (GstBusClass * klass)
       g_signal_new ("message", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
       G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
-      marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
+      g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
 
   g_type_class_add_private (klass, sizeof (GstBusPrivate));
 }
@@ -185,11 +216,13 @@ gst_bus_class_init (GstBusClass * klass)
 static void
 gst_bus_init (GstBus * bus)
 {
-  bus->queue = g_queue_new ();
-  bus->queue_lock = g_mutex_new ();
-
   bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
-  bus->priv->queue_cond = g_cond_new ();
+  bus->priv->enable_async = DEFAULT_ENABLE_ASYNC;
+  g_mutex_init (&bus->priv->queue_lock);
+  bus->priv->queue = gst_atomic_queue_new (32);
+
+  /* clear floating flag */
+  gst_object_ref_sink (bus);
 
   GST_DEBUG_OBJECT (bus, "created");
 }
@@ -199,66 +232,37 @@ gst_bus_dispose (GObject * object)
 {
   GstBus *bus = GST_BUS (object);
 
-  if (bus->queue) {
+  if (bus->priv->queue) {
     GstMessage *message;
 
-    g_mutex_lock (bus->queue_lock);
+    g_mutex_lock (&bus->priv->queue_lock);
     do {
-      message = g_queue_pop_head (bus->queue);
+      message = gst_atomic_queue_pop (bus->priv->queue);
       if (message)
         gst_message_unref (message);
     } while (message != NULL);
-    g_queue_free (bus->queue);
-    bus->queue = NULL;
-    g_mutex_unlock (bus->queue_lock);
-    g_mutex_free (bus->queue_lock);
-    bus->queue_lock = NULL;
-    g_cond_free (bus->priv->queue_cond);
-    bus->priv->queue_cond = NULL;
-  }
-
-  if (bus->priv->main_context) {
-    g_main_context_unref (bus->priv->main_context);
-    bus->priv->main_context = NULL;
+    gst_atomic_queue_unref (bus->priv->queue);
+    bus->priv->queue = NULL;
+    g_mutex_unlock (&bus->priv->queue_lock);
+    g_mutex_clear (&bus->priv->queue_lock);
+
+    if (bus->priv->poll)
+      gst_poll_free (bus->priv->poll);
+    bus->priv->poll = NULL;
   }
 
   G_OBJECT_CLASS (parent_class)->dispose (object);
 }
 
 static void
-gst_bus_wakeup_main_context (GstBus * bus)
+gst_bus_finalize (GObject * object)
 {
-  GMainContext *ctx;
-
-  GST_OBJECT_LOCK (bus);
-  if ((ctx = bus->priv->main_context))
-    g_main_context_ref (ctx);
-  GST_OBJECT_UNLOCK (bus);
-
-  g_main_context_wakeup (ctx);
-
-  if (ctx)
-    g_main_context_unref (ctx);
-}
-
-static void
-gst_bus_set_main_context (GstBus * bus, GMainContext * ctx)
-{
-  GST_OBJECT_LOCK (bus);
-
-  if (bus->priv->main_context != NULL) {
-    g_main_context_unref (bus->priv->main_context);
-    bus->priv->main_context = NULL;
-  }
-
-  if (ctx != NULL) {
-    bus->priv->main_context = g_main_context_ref (ctx);
-  }
+  GstBus *bus = GST_BUS (object);
 
-  GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p",
-      ctx, g_main_context_default ());
+  if (bus->priv->sync_handler_notify)
+    bus->priv->sync_handler_notify (bus->priv->sync_handler_data);
 
-  GST_OBJECT_UNLOCK (bus);
+  G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
 /**
@@ -287,7 +291,7 @@ gst_bus_new (void)
  * Post a message on the given bus. Ownership of the message
  * is taken by the bus.
  *
- * Returns: TRUE if the message could be posted, FALSE if the bus is flushing.
+ * Returns: %TRUE if the message could be posted, %FALSE if the bus is flushing.
  *
  * MT safe.
  */
@@ -302,18 +306,16 @@ gst_bus_post (GstBus * bus, GstMessage * message)
   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
 
-  GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %s, %" GST_PTR_FORMAT
-      " from source %" GST_PTR_FORMAT,
-      message, GST_MESSAGE_TYPE_NAME (message), message->structure,
-      message->src);
+  GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus %" GST_PTR_FORMAT, message,
+      message);
 
   GST_OBJECT_LOCK (bus);
   /* check if the bus is flushing */
   if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
     goto is_flushing;
 
-  handler = bus->sync_handler;
-  handler_data = bus->sync_handler_data;
+  handler = bus->priv->sync_handler;
+  handler_data = bus->priv->sync_handler_data;
   emit_sync_message = bus->priv->num_sync_message_emitters > 0;
   GST_OBJECT_UNLOCK (bus);
 
@@ -327,6 +329,11 @@ gst_bus_post (GstBus * bus, GstMessage * message)
       && handler != gst_bus_sync_signal_handler)
     gst_bus_sync_signal_handler (bus, message, NULL);
 
+  /* If this is a bus without async message delivery
+   * always drop the message */
+  if (!bus->priv->poll)
+    reply = GST_BUS_DROP;
+
   /* now see what we should do with the message */
   switch (reply) {
     case GST_BUS_DROP:
@@ -336,24 +343,20 @@ gst_bus_post (GstBus * bus, GstMessage * message)
     case GST_BUS_PASS:
       /* pass the message to the async queue, refcount passed in the queue */
       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
-      g_mutex_lock (bus->queue_lock);
-      g_queue_push_tail (bus->queue, message);
-      g_cond_broadcast (bus->priv->queue_cond);
-      g_mutex_unlock (bus->queue_lock);
+      gst_atomic_queue_push (bus->priv->queue, message);
+      gst_poll_write_control (bus->priv->poll);
       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
 
-      gst_bus_wakeup_main_context (bus);
-
       break;
     case GST_BUS_ASYNC:
     {
       /* async delivery, we need a mutex and a cond to block
        * on */
-      GMutex *lock = g_mutex_new ();
-      GCond *cond = g_cond_new ();
+      GCond *cond = GST_MESSAGE_GET_COND (message);
+      GMutex *lock = GST_MESSAGE_GET_LOCK (message);
 
-      GST_MESSAGE_COND (message) = cond;
-      GST_MESSAGE_GET_LOCK (message) = lock;
+      g_cond_init (cond);
+      g_mutex_init (lock);
 
       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
 
@@ -361,12 +364,9 @@ gst_bus_post (GstBus * bus, GstMessage * message)
        * queue. When the message is handled by the app and destroyed,
        * the cond will be signalled and we can continue */
       g_mutex_lock (lock);
-      g_mutex_lock (bus->queue_lock);
-      g_queue_push_tail (bus->queue, message);
-      g_cond_broadcast (bus->priv->queue_cond);
-      g_mutex_unlock (bus->queue_lock);
 
-      gst_bus_wakeup_main_context (bus);
+      gst_atomic_queue_push (bus->priv->queue, message);
+      gst_poll_write_control (bus->priv->poll);
 
       /* now block till the message is freed */
       g_cond_wait (cond, lock);
@@ -374,8 +374,8 @@ gst_bus_post (GstBus * bus, GstMessage * message)
 
       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
 
-      g_mutex_free (lock);
-      g_cond_free (cond);
+      g_mutex_clear (lock);
+      g_cond_clear (cond);
       break;
     }
     default:
@@ -402,7 +402,7 @@ is_flushing:
  * Check if there are pending messages on the bus that
  * should be handled.
  *
- * Returns: TRUE if there are messages on the bus to be handled, FALSE
+ * Returns: %TRUE if there are messages on the bus to be handled, %FALSE
  * otherwise.
  *
  * MT safe.
@@ -414,10 +414,8 @@ gst_bus_have_pending (GstBus * bus)
 
   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
 
-  g_mutex_lock (bus->queue_lock);
   /* see if there is a message on the bus */
-  result = !g_queue_is_empty (bus->queue);
-  g_mutex_unlock (bus->queue_lock);
+  result = gst_atomic_queue_length (bus->priv->queue) != 0;
 
   return result;
 }
@@ -429,7 +427,7 @@ gst_bus_have_pending (GstBus * bus)
  *
  * If @flushing, flush out and unref any messages queued in the bus. Releases
  * references to the message origin objects. Will flush future messages until
- * gst_bus_set_flushing() sets @flushing to #FALSE.
+ * gst_bus_set_flushing() sets @flushing to %FALSE.
  *
  * MT safe.
  */
@@ -470,70 +468,80 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing)
  * matching message was posted on the bus.
  *
  * Returns: (transfer full): a #GstMessage matching the filter in @types,
- *     or NULL if no matching message was found on the bus until the timeout
+ *     or %NULL if no matching message was found on the bus until the timeout
  *     expired. The message is taken from the bus and needs to be unreffed
  *     with gst_message_unref() after usage.
  *
  * MT safe.
- *
- * Since: 0.10.15
  */
 GstMessage *
 gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
     GstMessageType types)
 {
   GstMessage *message;
-  GTimeVal *timeval, abstimeout;
+  GTimeVal now, then;
   gboolean first_round = TRUE;
+  GstClockTime elapsed = 0;
 
   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
   g_return_val_if_fail (types != 0, NULL);
+  g_return_val_if_fail (timeout == 0 || bus->priv->poll != NULL, NULL);
 
-  g_mutex_lock (bus->queue_lock);
+  g_mutex_lock (&bus->priv->queue_lock);
 
   while (TRUE) {
-    GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue));
+    gint ret;
+
+    GST_LOG_OBJECT (bus, "have %d messages",
+        gst_atomic_queue_length (bus->priv->queue));
+
+    while ((message = gst_atomic_queue_pop (bus->priv->queue))) {
+      if (bus->priv->poll)
+        gst_poll_read_control (bus->priv->poll);
 
-    while ((message = g_queue_pop_head (bus->queue))) {
       GST_DEBUG_OBJECT (bus, "got message %p, %s from %s, type mask is %u",
           message, GST_MESSAGE_TYPE_NAME (message),
           GST_MESSAGE_SRC_NAME (message), (guint) types);
       if ((GST_MESSAGE_TYPE (message) & types) != 0) {
-        /* exit the loop, we have a message */
-        goto beach;
-      } else {
-        GST_DEBUG_OBJECT (bus, "discarding message, does not match mask");
-        gst_message_unref (message);
-        message = NULL;
+        /* Extra check to ensure extended types don't get matched unless
+         * asked for */
+        if ((GST_MESSAGE_TYPE_IS_EXTENDED (message) == FALSE)
+            || (types & GST_MESSAGE_EXTENDED)) {
+          /* exit the loop, we have a message */
+          goto beach;
+        }
       }
+
+      GST_DEBUG_OBJECT (bus, "discarding message, does not match mask");
+      gst_message_unref (message);
+      message = NULL;
     }
 
     /* no need to wait, exit loop */
     if (timeout == 0)
       break;
 
-    if (timeout == GST_CLOCK_TIME_NONE) {
-      /* wait forever */
-      timeval = NULL;
-    } else if (first_round) {
-      glong add = timeout / 1000;
-
-      if (add == 0)
-        /* no need to wait */
-        break;
-
-      /* make timeout absolute */
-      g_get_current_time (&abstimeout);
-      g_time_val_add (&abstimeout, add);
-      timeval = &abstimeout;
-      first_round = FALSE;
-      GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
-    } else {
-      /* calculated the absolute end time already, no need to do it again */
-      GST_DEBUG_OBJECT (bus, "blocking for message, again");
-      timeval = &abstimeout;    /* fool compiler */
+    else if (timeout != GST_CLOCK_TIME_NONE) {
+      if (first_round) {
+        g_get_current_time (&then);
+        first_round = FALSE;
+      } else {
+        g_get_current_time (&now);
+
+        elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then);
+
+        if (elapsed > timeout)
+          break;
+      }
     }
-    if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
+
+    /* only here in timeout case */
+    g_assert (bus->priv->poll);
+    g_mutex_unlock (&bus->priv->queue_lock);
+    ret = gst_poll_wait (bus->priv->poll, timeout - elapsed);
+    g_mutex_lock (&bus->priv->queue_lock);
+
+    if (ret == 0) {
       GST_INFO_OBJECT (bus, "timed out, breaking loop");
       break;
     } else {
@@ -543,7 +551,7 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
 
 beach:
 
-  g_mutex_unlock (bus->queue_lock);
+  g_mutex_unlock (&bus->priv->queue_lock);
 
   return message;
 }
@@ -561,13 +569,11 @@ beach:
  * posted on the bus.
  *
  * Returns: (transfer full): the #GstMessage that is on the bus after the
- *     specified timeout or NULL if the bus is empty after the timeout expired.
+ *     specified timeout or %NULL if the bus is empty after the timeout expired.
  * The message is taken from the bus and needs to be unreffed with
  * gst_message_unref() after usage.
  *
  * MT safe.
- *
- * Since: 0.10.12
  */
 GstMessage *
 gst_bus_timed_pop (GstBus * bus, GstClockTime timeout)
@@ -585,16 +591,15 @@ gst_bus_timed_pop (GstBus * bus, GstClockTime timeout)
  * Get a message matching @type from the bus.  Will discard all messages on
  * the bus that do not match @type and that have been posted before the first
  * message that does match @type.  If there is no message matching @type on
- * the bus, all messages will be discarded.
+ * the bus, all messages will be discarded. It is not possible to use message
+ * enums beyond #GST_MESSAGE_EXTENDED in the @events mask.
  *
  * Returns: (transfer full): the next #GstMessage matching @type that is on
- *     the bus, or NULL if the bus is empty or there is no message matching
+ *     the bus, or %NULL if the bus is empty or there is no message matching
  *     @type. The message is taken from the bus and needs to be unreffed with
  *     gst_message_unref() after usage.
  *
  * MT safe.
- *
- * Since: 0.10.15
  */
 GstMessage *
 gst_bus_pop_filtered (GstBus * bus, GstMessageType types)
@@ -611,7 +616,7 @@ gst_bus_pop_filtered (GstBus * bus, GstMessageType types)
  *
  * Get a message from the bus.
  *
- * Returns: (transfer full): the #GstMessage that is on the bus, or NULL if the
+ * Returns: (transfer full): the #GstMessage that is on the bus, or %NULL if the
  *     bus is empty. The message is taken from the bus and needs to be unreffed
  *     with gst_message_unref() after usage.
  *
@@ -633,7 +638,7 @@ gst_bus_pop (GstBus * bus)
  * on the bus' message queue. A reference is returned, and needs to be unreffed
  * by the caller.
  *
- * Returns: (transfer full): the #GstMessage that is on the bus, or NULL if the
+ * Returns: (transfer full): the #GstMessage that is on the bus, or %NULL if the
  *     bus is empty.
  *
  * MT safe.
@@ -645,11 +650,11 @@ gst_bus_peek (GstBus * bus)
 
   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
 
-  g_mutex_lock (bus->queue_lock);
-  message = g_queue_peek_head (bus->queue);
+  g_mutex_lock (&bus->priv->queue_lock);
+  message = gst_atomic_queue_peek (bus->priv->queue);
   if (message)
     gst_message_ref (message);
-  g_mutex_unlock (bus->queue_lock);
+  g_mutex_unlock (&bus->priv->queue_lock);
 
   GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
 
@@ -659,8 +664,9 @@ gst_bus_peek (GstBus * bus)
 /**
  * gst_bus_set_sync_handler:
  * @bus: a #GstBus to install the handler on
- * @func: The handler function to install
- * @data: User data that will be sent to the handler function.
+ * @func: (allow-none): The handler function to install
+ * @user_data: User data that will be sent to the handler function.
+ * @notify: called when @user_data becomes unused
  *
  * Sets the synchronous handler on the bus. The function will be called
  * every time a new message is posted on the bus. Note that the function
@@ -669,23 +675,37 @@ gst_bus_peek (GstBus * bus)
  * should handle messages asynchronously using the gst_bus watch and poll
  * functions.
  *
- * You cannot replace an existing sync_handler. You can pass NULL to this
+ * You cannot replace an existing sync_handler. You can pass %NULL to this
  * function, which will clear the existing handler.
  */
 void
-gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
+gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func,
+    gpointer user_data, GDestroyNotify notify)
 {
+  GDestroyNotify old_notify;
+
   g_return_if_fail (GST_IS_BUS (bus));
 
   GST_OBJECT_LOCK (bus);
-
   /* Assert if the user attempts to replace an existing sync_handler,
    * other than to clear it */
-  if (func != NULL && bus->sync_handler != NULL)
+  if (func != NULL && bus->priv->sync_handler != NULL)
     goto no_replace;
 
-  bus->sync_handler = func;
-  bus->sync_handler_data = data;
+  if ((old_notify = bus->priv->sync_handler_notify)) {
+    gpointer old_data = bus->priv->sync_handler_data;
+
+    bus->priv->sync_handler_data = NULL;
+    bus->priv->sync_handler_notify = NULL;
+    GST_OBJECT_UNLOCK (bus);
+
+    old_notify (old_data);
+
+    GST_OBJECT_LOCK (bus);
+  }
+  bus->priv->sync_handler = func;
+  bus->priv->sync_handler_data = user_data;
+  bus->priv->sync_handler_notify = notify;
   GST_OBJECT_UNLOCK (bus);
 
   return;
@@ -704,24 +724,13 @@ typedef struct
 {
   GSource source;
   GstBus *bus;
-  gboolean inited;
 } GstBusSource;
 
 static gboolean
 gst_bus_source_prepare (GSource * source, gint * timeout)
 {
-  GstBusSource *bsrc = (GstBusSource *) source;
-
-  /* we do this here now that we know that we're attached to a main context
-   * (we don't support detaching a source from a main context and then
-   * re-attaching it to a different main context) */
-  if (G_UNLIKELY (!bsrc->inited)) {
-    gst_bus_set_main_context (bsrc->bus, g_source_get_context (source));
-    bsrc->inited = TRUE;
-  }
-
   *timeout = -1;
-  return gst_bus_have_pending (bsrc->bus);
+  return FALSE;
 }
 
 static gboolean
@@ -729,7 +738,7 @@ gst_bus_source_check (GSource * source)
 {
   GstBusSource *bsrc = (GstBusSource *) source;
 
-  return gst_bus_have_pending (bsrc->bus);
+  return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR);
 }
 
 static gboolean
@@ -792,7 +801,6 @@ gst_bus_source_finalize (GSource * source)
     bus->priv->watch_id = NULL;
   GST_OBJECT_UNLOCK (bus);
 
-  gst_bus_set_main_context (bsource->bus, NULL);
   gst_object_unref (bsource->bus);
   bsource->bus = NULL;
 }
@@ -820,16 +828,15 @@ gst_bus_create_watch (GstBus * bus)
   GstBusSource *source;
 
   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
+  g_return_val_if_fail (bus->priv->poll != NULL, NULL);
 
   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
       sizeof (GstBusSource));
 
-#if GLIB_CHECK_VERSION(2,26,0)
   g_source_set_name ((GSource *) source, "GStreamer message bus watch");
-#endif
 
   source->bus = gst_object_ref (bus);
-  source->inited = FALSE;
+  g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
 
   return (GSource *) source;
 }
@@ -877,8 +884,8 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
  * @notify: the function to call when the source is removed.
  *
  * Adds a bus watch to the default main context with the given @priority (e.g.
- * %G_PRIORITY_DEFAULT). Since 0.10.33 it is also possible to use a non-default
- * main context set up using g_main_context_push_thread_default() (before
+ * %G_PRIORITY_DEFAULT). It is also possible to use a non-default  main
+ * context set up using g_main_context_push_thread_default() (before
  * one had to create a bus watch source and attach it to the desired main
  * context 'manually').
  *
@@ -889,12 +896,13 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
  * When @func is called, the message belongs to the caller; if you want to
  * keep a copy of it, call gst_message_ref() before leaving @func.
  *
- * The watch can be removed using g_source_remove() or by returning FALSE
+ * The watch can be removed using g_source_remove() or by returning %FALSE
  * from @func.
  *
- * Returns: The event source id.
- *
  * MT safe.
+ *
+ * Returns: The event source id.
+ * Rename to: gst_bus_add_watch
  */
 guint
 gst_bus_add_watch_full (GstBus * bus, gint priority,
@@ -912,14 +920,14 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
 }
 
 /**
- * gst_bus_add_watch:
+ * gst_bus_add_watch: (skip)
  * @bus: a #GstBus to create the watch for
  * @func: A function to call when a message is received.
  * @user_data: user data passed to @func.
  *
  * Adds a bus watch to the default main context with the default priority
- * (%G_PRIORITY_DEFAULT). Since 0.10.33 it is also possible to use a non-default
- * main context set up using g_main_context_push_thread_default() (before
+ * (%G_PRIORITY_DEFAULT). It is also possible to use a non-default main
+ * context set up using g_main_context_push_thread_default() (before
  * one had to create a bus watch source and attach it to the desired main
  * context 'manually').
  *
@@ -927,7 +935,7 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
  * There can only be a single bus watch per bus, you must remove it before you
  * can set a new one.
  *
- * The watch can be removed using g_source_remove() or by returning FALSE
+ * The watch can be removed using g_source_remove() or by returning %FALSE
  * from @func.
  *
  * Returns: The event source id.
@@ -1008,8 +1016,8 @@ poll_destroy_timeout (GstBusPollData * poll_data)
  * gst_bus_poll:
  * @bus: a #GstBus
  * @events: a mask of #GstMessageType, representing the set of message types to
- * poll for.
- * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll
+ * poll for (note special handling of extended message types below)
+ * @timeout: the poll timeout, as a #GstClockTime, or #GST_CLOCK_TIME_NONE to poll
  * indefinitely.
  *
  * Poll the bus for messages. Will block while waiting for messages to come.
@@ -1017,6 +1025,8 @@ poll_destroy_timeout (GstBusPollData * poll_data)
  * @timeout is negative, this function will block indefinitely.
  *
  * All messages not in @events will be popped off the bus and will be ignored.
+ * It is not possible to use message enums beyond #GST_MESSAGE_EXTENDED in the
+ * @events mask
  *
  * Because poll is implemented using the "message" signal enabled by
  * gst_bus_add_signal_watch(), calling gst_bus_poll() will cause the "message"
@@ -1044,12 +1054,12 @@ poll_destroy_timeout (GstBusPollData * poll_data)
  * better handled by setting up an asynchronous bus watch and doing things
  * from there.
  *
- * Returns: (transfer full): the message that was received, or NULL if the
+ * Returns: (transfer full): the message that was received, or %NULL if the
  *     poll timed out. The message is taken from the bus and needs to be
  *     unreffed with gst_message_unref() after usage.
  */
 GstMessage *
-gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
+gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTime timeout)
 {
   GstBusPollData *poll_data;
   GstMessage *ret;
@@ -1061,7 +1071,7 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
   poll_data->events = events;
   poll_data->message = NULL;
 
-  if (timeout >= 0)
+  if (timeout != GST_CLOCK_TIME_NONE)
     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
         timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
         (GDestroyNotify) poll_destroy_timeout);
@@ -1103,7 +1113,7 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
  * A helper #GstBusFunc that can be used to convert all asynchronous messages
  * into signals.
  *
- * Returns: TRUE
+ * Returns: %TRUE
  */
 gboolean
 gst_bus_async_signal_func (GstBus * bus, GstMessage * message, gpointer data)
@@ -1200,7 +1210,7 @@ void
 gst_bus_disable_sync_message_emission (GstBus * bus)
 {
   g_return_if_fail (GST_IS_BUS (bus));
-  g_return_if_fail (bus->num_signal_watchers == 0);
+  g_return_if_fail (bus->priv->num_sync_message_emitters > 0);
 
   GST_OBJECT_LOCK (bus);
   bus->priv->num_sync_message_emitters--;
@@ -1213,8 +1223,8 @@ gst_bus_disable_sync_message_emission (GstBus * bus)
  * @priority: The priority of the watch.
  *
  * Adds a bus signal watch to the default main context with the given @priority
- * (e.g. %G_PRIORITY_DEFAULT). Since 0.10.33 it is also possible to use a
- * non-default main context set up using g_main_context_push_thread_default()
+ * (e.g. %G_PRIORITY_DEFAULT). It is also possible to use a non-default main
+ * context set up using g_main_context_push_thread_default()
  * (before one had to create a bus watch source and attach it to the desired
  * main context 'manually').
  *
@@ -1238,22 +1248,22 @@ gst_bus_add_signal_watch_full (GstBus * bus, gint priority)
   /* I know the callees don't take this lock, so go ahead and abuse it */
   GST_OBJECT_LOCK (bus);
 
-  if (bus->num_signal_watchers > 0)
+  if (bus->priv->num_signal_watchers > 0)
     goto done;
 
   /* this should not fail because the counter above takes care of it */
-  g_assert (bus->signal_watch_id == 0);
+  g_assert (bus->priv->signal_watch_id == 0);
 
-  bus->signal_watch_id =
+  bus->priv->signal_watch_id =
       gst_bus_add_watch_full_unlocked (bus, priority, gst_bus_async_signal_func,
       NULL, NULL);
 
-  if (G_UNLIKELY (bus->signal_watch_id == 0))
+  if (G_UNLIKELY (bus->priv->signal_watch_id == 0))
     goto add_failed;
 
 done:
 
-  bus->num_signal_watchers++;
+  bus->priv->num_signal_watchers++;
 
   GST_OBJECT_UNLOCK (bus);
   return;
@@ -1272,7 +1282,7 @@ add_failed:
  * @bus: a #GstBus on which you want to receive the "message" signal
  *
  * Adds a bus signal watch to the default main context with the default priority
- * (%G_PRIORITY_DEFAULT). Since 0.10.33 it is also possible to use a non-default
+ * (%G_PRIORITY_DEFAULT). It is also possible to use a non-default
  * main context set up using g_main_context_push_thread_default() (before
  * one had to create a bus watch source and attach it to the desired main
  * context 'manually').
@@ -1310,16 +1320,16 @@ gst_bus_remove_signal_watch (GstBus * bus)
   /* I know the callees don't take this lock, so go ahead and abuse it */
   GST_OBJECT_LOCK (bus);
 
-  if (bus->num_signal_watchers == 0)
+  if (bus->priv->num_signal_watchers == 0)
     goto error;
 
-  bus->num_signal_watchers--;
+  bus->priv->num_signal_watchers--;
 
-  if (bus->num_signal_watchers > 0)
+  if (bus->priv->num_signal_watchers > 0)
     goto done;
 
-  id = bus->signal_watch_id;
-  bus->signal_watch_id = 0;
+  id = bus->priv->signal_watch_id;
+  bus->priv->signal_watch_id = 0;
 
   GST_DEBUG_OBJECT (bus, "removing signal watch %u", id);