Revert lockfree GstBus for the release
authorTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 26 Apr 2011 14:42:46 +0000 (15:42 +0100)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 26 Apr 2011 15:14:00 +0000 (16:14 +0100)
Drop in old GstBus code for the release to play it safe, since
regressions that are apparently hard to track down and reproduce
have been reported (on windows/OSX mostly) against the lockfree
version, and more time is needed to fix them.

This reverts commit 03391a897001d35d1d290f27dd12e98a8b729fb4.
This reverts commit 43cdbc17e6f944cdf02aeed78d1d5f6bde5190c9.
This reverts commit 80eb160e0f62350271f061daa5f289d9d4277cf4.
This reverts commit c41b0ade28790ffdb0e484b41cd7929c4e145dec.
This reverts commit 874d60e5899dd5b89854679d1a4ad016a58ba4e0.
This reverts commit 79370d4b1781af9c9a65f2d1e3498124d8c4c413.
This reverts commit 2cb3e5235196eb71fb25e0a4a4b8749d6d0a8453.
This reverts commit bd1c40011434c1efaa696dc98ef855ef9cce9b28.
This reverts commit 4bf8f1524f6e3374b3f3bc57322337723d06b928.
This reverts commit 14d7db1b527b05f029819057aef5c123ac7e013d.

https://bugzilla.gnome.org/show_bug.cgi?id=647493

gst/gst_private.h
gst/gstbin.c
gst/gstbus.c
gst/gstbus.h

index da8a50a..2cafbee 100644 (file)
@@ -123,6 +123,7 @@ gboolean  priv_gst_structure_append_to_gstring (const GstStructure * structure,
 gboolean               gst_registry_binary_read_cache  (GstRegistry * registry, const char *location);
 gboolean               gst_registry_binary_write_cache (GstRegistry * registry, const char *location);
 
+
 /* used in gstvalue.c and gststructure.c */
 #define GST_ASCII_IS_STRING(c) (g_ascii_isalnum((c)) || ((c) == '_') || \
     ((c) == '-') || ((c) == '+') || ((c) == '/') || ((c) == ':') || \
index d4375f8..7375821 100644 (file)
@@ -540,7 +540,7 @@ gst_bin_init (GstBin * bin, GstBinClass * klass)
   bin->clock_dirty = FALSE;
 
   /* Set up a bus for listening to child elements */
-  bus = g_object_new (GST_TYPE_BUS, "enable-async", FALSE, NULL);
+  bus = gst_bus_new ();
   bin->child_bus = bus;
   GST_DEBUG_OBJECT (bin, "using bus %" GST_PTR_FORMAT " to listen to children",
       bus);
index caff420..bca4558 100644 (file)
@@ -75,7 +75,6 @@
 #include <sys/types.h>
 
 #include "gstinfo.h"
-#include "gstpoll.h"
 
 #include "gstbus.h"
 
@@ -89,27 +88,19 @@ enum
   LAST_SIGNAL
 };
 
-#define DEFAULT_ENABLE_ASYNC (TRUE)
-
-enum
-{
-  PROP_0,
-  PROP_ENABLE_ASYNC
-};
-
 static void gst_bus_dispose (GObject * object);
 
+static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx);
+
 static GstObjectClass *parent_class = NULL;
 static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
 
 struct _GstBusPrivate
 {
   guint num_sync_message_emitters;
+  GCond *queue_cond;
   GSource *watch_id;
-
-  gboolean enable_async;
-  GstPoll *poll;
-  GPollFD pollfd;
+  GMainContext *main_context;
 };
 
 G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
@@ -143,33 +134,6 @@ marshal_VOID__MINIOBJECT (GClosure * closure, GValue * return_value,
 }
 
 static void
-gst_bus_set_property (GObject * object,
-    guint prop_id, const GValue * value, GParamSpec * pspec)
-{
-  GstBus *bus = GST_BUS_CAST (object);
-
-  switch (prop_id) {
-    case PROP_ENABLE_ASYNC:
-      bus->priv->enable_async = g_value_get_boolean (value);
-      break;
-    default:
-      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
-      break;
-  }
-}
-
-static void
-gst_bus_constructed (GObject * object)
-{
-  GstBus *bus = GST_BUS_CAST (object);
-
-  if (bus->priv->enable_async) {
-    bus->priv->poll = gst_poll_new_timer ();
-    gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
-  }
-}
-
-static void
 gst_bus_class_init (GstBusClass * klass)
 {
   GObjectClass *gobject_class = (GObjectClass *) klass;
@@ -177,25 +141,6 @@ gst_bus_class_init (GstBusClass * klass)
   parent_class = g_type_class_peek_parent (klass);
 
   gobject_class->dispose = gst_bus_dispose;
-  gobject_class->set_property = gst_bus_set_property;
-  gobject_class->constructed = gst_bus_constructed;
-
-  /* GstBus:enable-async:
-   *
-   * Enable async message delivery support for bus watches,
-   * gst_bus_pop() and similar API. Without this only the
-   * synchronous message handlers are called.
-   *
-   * This property is used to create the child element buses
-   * in #GstBin.
-   *
-   * Since: 0.10.33
-   */
-  g_object_class_install_property (gobject_class, PROP_ENABLE_ASYNC,
-      g_param_spec_boolean ("enable-async", "Enable Async",
-          "Enable async message delivery for bus watches and gst_bus_pop()",
-          DEFAULT_ENABLE_ASYNC,
-          G_PARAM_CONSTRUCT_ONLY | G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
 
   /**
    * GstBus::sync-message:
@@ -239,11 +184,11 @@ gst_bus_class_init (GstBusClass * klass)
 static void
 gst_bus_init (GstBus * bus)
 {
-  bus->queue = gst_atomic_queue_new (32);
+  bus->queue = g_queue_new ();
   bus->queue_lock = g_mutex_new ();
 
   bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
-  bus->priv->enable_async = DEFAULT_ENABLE_ASYNC;
+  bus->priv->queue_cond = g_cond_new ();
 
   GST_DEBUG_OBJECT (bus, "created");
 }
@@ -258,24 +203,63 @@ gst_bus_dispose (GObject * object)
 
     g_mutex_lock (bus->queue_lock);
     do {
-      message = gst_atomic_queue_pop (bus->queue);
+      message = g_queue_pop_head (bus->queue);
       if (message)
         gst_message_unref (message);
     } while (message != NULL);
-    gst_atomic_queue_unref (bus->queue);
+    g_queue_free (bus->queue);
     bus->queue = NULL;
     g_mutex_unlock (bus->queue_lock);
     g_mutex_free (bus->queue_lock);
     bus->queue_lock = NULL;
+    g_cond_free (bus->priv->queue_cond);
+    bus->priv->queue_cond = NULL;
+  }
 
-    if (bus->priv->poll)
-      gst_poll_free (bus->priv->poll);
-    bus->priv->poll = NULL;
+  if (bus->priv->main_context) {
+    g_main_context_unref (bus->priv->main_context);
+    bus->priv->main_context = NULL;
   }
 
   G_OBJECT_CLASS (parent_class)->dispose (object);
 }
 
+static void
+gst_bus_wakeup_main_context (GstBus * bus)
+{
+  GMainContext *ctx;
+
+  GST_OBJECT_LOCK (bus);
+  if ((ctx = bus->priv->main_context))
+    g_main_context_ref (ctx);
+  GST_OBJECT_UNLOCK (bus);
+
+  g_main_context_wakeup (ctx);
+
+  if (ctx)
+    g_main_context_unref (ctx);
+}
+
+static void
+gst_bus_set_main_context (GstBus * bus, GMainContext * ctx)
+{
+  GST_OBJECT_LOCK (bus);
+
+  if (bus->priv->main_context != NULL) {
+    g_main_context_unref (bus->priv->main_context);
+    bus->priv->main_context = NULL;
+  }
+
+  if (ctx != NULL) {
+    bus->priv->main_context = g_main_context_ref (ctx);
+  }
+
+  GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p",
+      ctx, g_main_context_default ());
+
+  GST_OBJECT_UNLOCK (bus);
+}
+
 /**
  * gst_bus_new:
  *
@@ -342,11 +326,6 @@ gst_bus_post (GstBus * bus, GstMessage * message)
       && handler != gst_bus_sync_signal_handler)
     gst_bus_sync_signal_handler (bus, message, NULL);
 
-  /* If this is a bus without async message delivery
-   * always drop the message */
-  if (!bus->priv->poll)
-    reply = GST_BUS_DROP;
-
   /* now see what we should do with the message */
   switch (reply) {
     case GST_BUS_DROP:
@@ -356,10 +335,14 @@ gst_bus_post (GstBus * bus, GstMessage * message)
     case GST_BUS_PASS:
       /* pass the message to the async queue, refcount passed in the queue */
       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
-      gst_atomic_queue_push (bus->queue, message);
-      gst_poll_write_control (bus->priv->poll);
+      g_mutex_lock (bus->queue_lock);
+      g_queue_push_tail (bus->queue, message);
+      g_cond_broadcast (bus->priv->queue_cond);
+      g_mutex_unlock (bus->queue_lock);
       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
 
+      gst_bus_wakeup_main_context (bus);
+
       break;
     case GST_BUS_ASYNC:
     {
@@ -377,9 +360,12 @@ gst_bus_post (GstBus * bus, GstMessage * message)
        * queue. When the message is handled by the app and destroyed,
        * the cond will be signalled and we can continue */
       g_mutex_lock (lock);
+      g_mutex_lock (bus->queue_lock);
+      g_queue_push_tail (bus->queue, message);
+      g_cond_broadcast (bus->priv->queue_cond);
+      g_mutex_unlock (bus->queue_lock);
 
-      gst_atomic_queue_push (bus->queue, message);
-      gst_poll_write_control (bus->priv->poll);
+      gst_bus_wakeup_main_context (bus);
 
       /* now block till the message is freed */
       g_cond_wait (cond, lock);
@@ -427,8 +413,10 @@ gst_bus_have_pending (GstBus * bus)
 
   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
 
+  g_mutex_lock (bus->queue_lock);
   /* see if there is a message on the bus */
-  result = gst_atomic_queue_length (bus->queue) != 0;
+  result = !g_queue_is_empty (bus->queue);
+  g_mutex_unlock (bus->queue_lock);
 
   return result;
 }
@@ -494,24 +482,18 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
     GstMessageType types)
 {
   GstMessage *message;
-  GTimeVal now, then;
+  GTimeVal *timeval, abstimeout;
   gboolean first_round = TRUE;
 
   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
   g_return_val_if_fail (types != 0, NULL);
-  g_return_val_if_fail (timeout == 0 || bus->priv->poll != NULL, NULL);
 
   g_mutex_lock (bus->queue_lock);
 
   while (TRUE) {
-    gint ret;
+    GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue));
 
-    GST_LOG_OBJECT (bus, "have %d messages",
-        gst_atomic_queue_length (bus->queue));
-
-    while ((message = gst_atomic_queue_pop (bus->queue))) {
-      if (bus->priv->poll)
-        gst_poll_read_control (bus->priv->poll);
+    while ((message = g_queue_pop_head (bus->queue))) {
       GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u",
           message, GST_MESSAGE_TYPE_NAME (message), (guint) types);
       if ((GST_MESSAGE_TYPE (message) & types) != 0) {
@@ -528,30 +510,28 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
     if (timeout == 0)
       break;
 
-    else if (timeout != GST_CLOCK_TIME_NONE) {
-      if (first_round) {
-        g_get_current_time (&then);
-        first_round = FALSE;
-      } else {
-        GstClockTime elapsed;
-
-        g_get_current_time (&now);
-
-        elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then);
-        if (timeout > elapsed)
-          timeout -= elapsed;
-        else
-          timeout = 0;
-      }
+    if (timeout == GST_CLOCK_TIME_NONE) {
+      /* wait forever */
+      timeval = NULL;
+    } else if (first_round) {
+      glong add = timeout / 1000;
+
+      if (add == 0)
+        /* no need to wait */
+        break;
+
+      /* make timeout absolute */
+      g_get_current_time (&abstimeout);
+      g_time_val_add (&abstimeout, add);
+      timeval = &abstimeout;
+      first_round = FALSE;
+      GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
+    } else {
+      /* calculated the absolute end time already, no need to do it again */
+      GST_DEBUG_OBJECT (bus, "blocking for message, again");
+      timeval = &abstimeout;    /* fool compiler */
     }
-
-    /* only here in timeout case */
-    g_assert (bus->priv->poll);
-    g_mutex_unlock (bus->queue_lock);
-    ret = gst_poll_wait (bus->priv->poll, timeout);
-    g_mutex_lock (bus->queue_lock);
-
-    if (ret == 0) {
+    if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
       GST_INFO_OBJECT (bus, "timed out, breaking loop");
       break;
     } else {
@@ -664,7 +644,7 @@ gst_bus_peek (GstBus * bus)
   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
 
   g_mutex_lock (bus->queue_lock);
-  message = gst_atomic_queue_peek (bus->queue);
+  message = g_queue_peek_head (bus->queue);
   if (message)
     gst_message_ref (message);
   g_mutex_unlock (bus->queue_lock);
@@ -722,13 +702,24 @@ typedef struct
 {
   GSource source;
   GstBus *bus;
+  gboolean inited;
 } GstBusSource;
 
 static gboolean
 gst_bus_source_prepare (GSource * source, gint * timeout)
 {
+  GstBusSource *bsrc = (GstBusSource *) source;
+
+  /* we do this here now that we know that we're attached to a main context
+   * (we don't support detaching a source from a main context and then
+   * re-attaching it to a different main context) */
+  if (G_UNLIKELY (!bsrc->inited)) {
+    gst_bus_set_main_context (bsrc->bus, g_source_get_context (source));
+    bsrc->inited = TRUE;
+  }
+
   *timeout = -1;
-  return FALSE;
+  return gst_bus_have_pending (bsrc->bus);
 }
 
 static gboolean
@@ -736,7 +727,7 @@ gst_bus_source_check (GSource * source)
 {
   GstBusSource *bsrc = (GstBusSource *) source;
 
-  return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR);
+  return gst_bus_have_pending (bsrc->bus);
 }
 
 static gboolean
@@ -798,6 +789,7 @@ gst_bus_source_finalize (GSource * source)
     bus->priv->watch_id = NULL;
   GST_OBJECT_UNLOCK (bus);
 
+  gst_bus_set_main_context (bsource->bus, NULL);
   gst_object_unref (bsource->bus);
   bsource->bus = NULL;
 }
@@ -825,12 +817,11 @@ gst_bus_create_watch (GstBus * bus)
   GstBusSource *source;
 
   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
-  g_return_val_if_fail (bus->priv->poll != NULL, NULL);
 
   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
       sizeof (GstBusSource));
   source->bus = gst_object_ref (bus);
-  g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
+  source->inited = FALSE;
 
   return (GSource *) source;
 }
index 732591f..30afe60 100644 (file)
@@ -28,7 +28,6 @@ typedef struct _GstBusClass GstBusClass;
 
 #include <gst/gstmessage.h>
 #include <gst/gstclock.h>
-#include <gst/gstatomicqueue.h>
 
 G_BEGIN_DECLS
 
@@ -116,7 +115,7 @@ struct _GstBus
   GstObject         object;
 
   /*< private >*/
-  GstAtomicQueue   *queue;
+  GQueue           *queue;
   GMutex           *queue_lock;
 
   GstBusSyncHandler sync_handler;