gst/gstbus.*: Implement a real GSource and use g_main_context_wakeup() to signal...
authorWim Taymans <wim.taymans@gmail.com>
Thu, 26 May 2005 10:48:53 +0000 (10:48 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 26 May 2005 10:48:53 +0000 (10:48 +0000)
Original commit message from CVS:
* gst/gstbus.c: (gst_bus_init), (gst_bus_dispose), (gst_bus_post),
(gst_bus_pop), (gst_bus_source_prepare), (gst_bus_source_check),
(gst_bus_source_dispatch), (gst_bus_source_finalize),
(gst_bus_create_watch), (gst_bus_add_watch_full):
* gst/gstbus.h:
Implement a real GSource and use g_main_context_wakeup() to
signal new messages instead of the socketpair.

ChangeLog
gst/gstbus.c
gst/gstbus.h

index fca4155..4e77295 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,13 @@
+2005-05-26  Wim Taymans  <wim@fluendo.com>
+
+       * gst/gstbus.c: (gst_bus_init), (gst_bus_dispose), (gst_bus_post),
+       (gst_bus_pop), (gst_bus_source_prepare), (gst_bus_source_check),
+       (gst_bus_source_dispatch), (gst_bus_source_finalize),
+       (gst_bus_create_watch), (gst_bus_add_watch_full):
+       * gst/gstbus.h:
+       Implement a real GSource and use g_main_context_wakeup() to
+       signal new messages instead of the socketpair.
+
 2005-05-25  Wim Taymans  <wim@fluendo.com>
 
        * gst/gstbin.c: (bin_element_is_sink), (has_ancestor),
index 07f4d2b..f08e5cc 100644 (file)
@@ -97,19 +97,7 @@ gst_bus_init (GstBus * bus)
   bus->queue = g_queue_new ();
   bus->queue_lock = g_mutex_new ();
 
-  if (socketpair (PF_UNIX, SOCK_STREAM, 0, bus->control_socket) < 0)
-    goto no_socketpair;
-
-  bus->io_channel = g_io_channel_unix_new (bus->control_socket[0]);
-
   return;
-
-  /* errors */
-no_socketpair:
-  {
-    g_warning ("cannot create io channel");
-    bus->io_channel = NULL;
-  }
 }
 
 static void
@@ -119,14 +107,6 @@ gst_bus_dispose (GObject * object)
 
   bus = GST_BUS (object);
 
-  if (bus->io_channel) {
-    g_io_channel_shutdown (bus->io_channel, TRUE, NULL);
-    g_io_channel_unref (bus->io_channel);
-    bus->io_channel = NULL;
-  }
-  close (bus->control_socket[0]);
-  close (bus->control_socket[1]);
-
   if (bus->queue) {
     g_mutex_lock (bus->queue_lock);
     g_queue_free (bus->queue);
@@ -193,12 +173,9 @@ gst_bus_new (void)
 gboolean
 gst_bus_post (GstBus * bus, GstMessage * message)
 {
-  gchar c;
   GstBusSyncReply reply = GST_BUS_PASS;
   GstBusSyncHandler handler;
   gpointer handler_data;
-  gboolean need_write = FALSE;
-  ssize_t write_ret = -1;
 
   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
@@ -207,7 +184,6 @@ gst_bus_post (GstBus * bus, GstMessage * message)
       GST_MESSAGE_TYPE (message));
 
   GST_LOCK (bus);
-
   if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING)) {
     gst_message_unref (message);
     GST_UNLOCK (bus);
@@ -232,26 +208,12 @@ gst_bus_post (GstBus * bus, GstMessage * message)
     case GST_BUS_PASS:
       /* pass the message to the async queue */
       g_mutex_lock (bus->queue_lock);
-      if (g_queue_get_length (bus->queue) == 0)
-        need_write = TRUE;
       g_queue_push_tail (bus->queue, message);
       g_mutex_unlock (bus->queue_lock);
 
-      if (need_write) {
-        c = 'p';
-        errno = EAGAIN;
-        while (write_ret == -1) {
-          switch (errno) {
-            case EAGAIN:
-            case EINTR:
-              break;
-            default:
-              perror ("gst_bus_post: could not write to fd");
-              return FALSE;
-          }
-          write_ret = write (bus->control_socket[1], &c, 1);
-        }
-      }
+      /* FIXME cannot assume the source is only in the default context */
+      g_main_context_wakeup (NULL);
+
       break;
     case GST_BUS_ASYNC:
     {
@@ -270,26 +232,11 @@ gst_bus_post (GstBus * bus, GstMessage * message)
        * the cond will be signalled and we can continue */
       g_mutex_lock (lock);
       g_mutex_lock (bus->queue_lock);
-      if (g_queue_get_length (bus->queue) == 0)
-        need_write = TRUE;
       g_queue_push_tail (bus->queue, message);
       g_mutex_unlock (bus->queue_lock);
 
-      if (need_write) {
-        c = 'p';
-        errno = EAGAIN;
-        while (write_ret == -1) {
-          switch (errno) {
-            case EAGAIN:
-            case EINTR:
-              break;
-            default:
-              perror ("gst_bus_post: could not write to fd");
-              return FALSE;
-          }
-          write_ret = write (bus->control_socket[1], &c, 1);
-        }
-      }
+      /* FIXME cannot assume the source is only in the default context */
+      g_main_context_wakeup (NULL);
 
       /* now block till the message is freed */
       g_cond_wait (cond, lock);
@@ -375,36 +322,13 @@ GstMessage *
 gst_bus_pop (GstBus * bus)
 {
   GstMessage *message;
-  gboolean needs_read = FALSE;
 
   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
 
   g_mutex_lock (bus->queue_lock);
   message = g_queue_pop_head (bus->queue);
-  if (message && g_queue_get_length (bus->queue) == 0)
-    needs_read = TRUE;
   g_mutex_unlock (bus->queue_lock);
 
-  if (needs_read) {
-    gchar c;
-    ssize_t read_ret = -1;
-
-    /* the char in the fd is essentially just a way to wake us up. read it off so
-       we're not woken up again. */
-    errno = EAGAIN;
-    while (read_ret == -1) {
-      switch (errno) {
-        case EAGAIN:
-        case EINTR:
-          break;
-        default:
-          perror ("gst_bus_pop: could not read from fd");
-          return NULL;
-      }
-      read_ret = read (bus->control_socket[0], &c, 1);
-    }
-  }
-
   return message;
 }
 
@@ -454,68 +378,92 @@ gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
   GST_UNLOCK (bus);
 }
 
-/**
- * gst_bus_create_watch:
- * @bus: a #GstBus to create the watch for
- *
- * Create watch for this bus. 
- *
- * Returns: A #GSource that can be added to a mainloop.
+/* GSource for the bus
  */
-GSource *
-gst_bus_create_watch (GstBus * bus)
+typedef struct
 {
-  GSource *source;
-
-  g_return_val_if_fail (GST_IS_BUS (bus), NULL);
-
-  /* FIXME, we need to ref the bus and unref it when the source
-   * is destroyed */
-  source = g_io_create_watch (bus->io_channel, G_IO_IN);
+  GSource source;
+  GstBus *bus;
+} GstBusSource;
 
-  return source;
+gboolean
+gst_bus_source_prepare (GSource * source, gint * timeout)
+{
+  *timeout = -1;
+  return gst_bus_have_pending (((GstBusSource *) source)->bus);
 }
 
-typedef struct
+gboolean
+gst_bus_source_check (GSource * source)
 {
-  GSource *source;
-  GstBus *bus;
-  gint priority;
-  GstBusHandler handler;
-  gpointer user_data;
-  GDestroyNotify notify;
-} GstBusWatch;
+  return gst_bus_have_pending (((GstBusSource *) source)->bus);
+}
 
-static gboolean
-bus_watch_callback (GIOChannel * channel, GIOCondition cond,
-    GstBusWatch * watch)
+gboolean
+gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
+    gpointer user_data)
 {
+  GstBusHandler handler = (GstBusHandler) callback;
+  GstBusSource *bsource = (GstBusSource *) source;
   GstMessage *message;
   gboolean needs_pop = TRUE;
 
-  g_return_val_if_fail (GST_IS_BUS (watch->bus), FALSE);
+  g_return_val_if_fail (GST_IS_BUS (bsource->bus), FALSE);
 
-  message = gst_bus_peek (watch->bus);
+  message = gst_bus_peek (bsource->bus);
 
   g_return_val_if_fail (message != NULL, TRUE);
 
-  if (watch->handler)
-    needs_pop = watch->handler (watch->bus, message, watch->user_data);
+  if (!handler) {
+    g_warning ("GstBus watch dispatched without callback\n"
+        "You must call g_source_connect().");
+    return FALSE;
+  }
+
+  needs_pop = handler (bsource->bus, message, user_data);
 
   if (needs_pop)
-    gst_message_unref (gst_bus_pop (watch->bus));
+    gst_message_unref (gst_bus_pop (bsource->bus));
 
   return TRUE;
 }
 
-static void
-bus_watch_destroy (GstBusWatch * watch)
+void
+gst_bus_source_finalize (GSource * source)
 {
-  if (watch->notify) {
-    watch->notify (watch->user_data);
-  }
-  gst_object_unref (GST_OBJECT_CAST (watch->bus));
-  g_free (watch);
+  GstBusSource *bsource = (GstBusSource *) source;
+
+  gst_object_unref (GST_OBJECT_CAST (bsource->bus));
+}
+
+static GSourceFuncs gst_bus_source_funcs = {
+  gst_bus_source_prepare,
+  gst_bus_source_check,
+  gst_bus_source_dispatch,
+  gst_bus_source_finalize
+};
+
+/**
+ * gst_bus_create_watch:
+ * @bus: a #GstBus to create the watch for
+ *
+ * Create watch for this bus. 
+ *
+ * Returns: A #GSource that can be added to a mainloop.
+ */
+GSource *
+gst_bus_create_watch (GstBus * bus)
+{
+  GstBusSource *source;
+
+  g_return_val_if_fail (GST_IS_BUS (bus), NULL);
+
+  source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
+      sizeof (GstBusSource));
+  gst_object_ref (GST_OBJECT_CAST (bus));
+  source->bus = bus;
+
+  return (GSource *) source;
 }
 
 /**
@@ -540,28 +488,19 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
     GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
 {
   guint id;
-  GstBusWatch *watch;
+  GSource *source;
 
   g_return_val_if_fail (GST_IS_BUS (bus), 0);
 
-  watch = g_new (GstBusWatch, 1);
-
-  gst_object_ref (GST_OBJECT_CAST (bus));
-  watch->source = gst_bus_create_watch (bus);
-  watch->bus = bus;
-  watch->priority = priority;
-  watch->handler = handler;
-  watch->user_data = user_data;
-  watch->notify = notify;
+  source = gst_bus_create_watch (bus);
 
   if (priority != G_PRIORITY_DEFAULT)
-    g_source_set_priority (watch->source, priority);
+    g_source_set_priority (source, priority);
 
-  g_source_set_callback (watch->source, (GSourceFunc) bus_watch_callback,
-      watch, (GDestroyNotify) bus_watch_destroy);
+  g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
 
-  id = g_source_attach (watch->source, NULL);
-  g_source_unref (watch->source);
+  id = g_source_attach (source, NULL);
+  g_source_unref (source);
 
   return id;
 }
index 59f5920..23881f0 100644 (file)
@@ -64,9 +64,6 @@ struct _GstBus
   GstBusSyncHandler sync_handler;
   gpointer         sync_handler_data;
 
-  gint                     control_socket[2];
-  GIOChannel       *io_channel;
-
   /*< private > */
   gpointer _gst_reserved[GST_PADDING];
 };
@@ -105,6 +102,6 @@ guint                       gst_bus_add_watch               (GstBus * bus,
 GstMessageType         gst_bus_poll                    (GstBus *bus, GstMessageType events,
                                                          GstClockTimeDiff timeout);
 
-
 G_END_DECLS
+
 #endif /* __GST_BUS_H__ */