bus->queue = g_queue_new ();
bus->queue_lock = g_mutex_new ();
- if (socketpair (PF_UNIX, SOCK_STREAM, 0, bus->control_socket) < 0)
- goto no_socketpair;
-
- bus->io_channel = g_io_channel_unix_new (bus->control_socket[0]);
-
return;
-
- /* errors */
-no_socketpair:
- {
- g_warning ("cannot create io channel");
- bus->io_channel = NULL;
- }
}
static void
bus = GST_BUS (object);
- if (bus->io_channel) {
- g_io_channel_shutdown (bus->io_channel, TRUE, NULL);
- g_io_channel_unref (bus->io_channel);
- bus->io_channel = NULL;
- }
- close (bus->control_socket[0]);
- close (bus->control_socket[1]);
-
if (bus->queue) {
g_mutex_lock (bus->queue_lock);
g_queue_free (bus->queue);
gboolean
gst_bus_post (GstBus * bus, GstMessage * message)
{
- gchar c;
GstBusSyncReply reply = GST_BUS_PASS;
GstBusSyncHandler handler;
gpointer handler_data;
- gboolean need_write = FALSE;
- ssize_t write_ret = -1;
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
GST_MESSAGE_TYPE (message));
GST_LOCK (bus);
-
if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING)) {
gst_message_unref (message);
GST_UNLOCK (bus);
case GST_BUS_PASS:
/* pass the message to the async queue */
g_mutex_lock (bus->queue_lock);
- if (g_queue_get_length (bus->queue) == 0)
- need_write = TRUE;
g_queue_push_tail (bus->queue, message);
g_mutex_unlock (bus->queue_lock);
- if (need_write) {
- c = 'p';
- errno = EAGAIN;
- while (write_ret == -1) {
- switch (errno) {
- case EAGAIN:
- case EINTR:
- break;
- default:
- perror ("gst_bus_post: could not write to fd");
- return FALSE;
- }
- write_ret = write (bus->control_socket[1], &c, 1);
- }
- }
+ /* FIXME cannot assume the source is only in the default context */
+ g_main_context_wakeup (NULL);
+
break;
case GST_BUS_ASYNC:
{
* the cond will be signalled and we can continue */
g_mutex_lock (lock);
g_mutex_lock (bus->queue_lock);
- if (g_queue_get_length (bus->queue) == 0)
- need_write = TRUE;
g_queue_push_tail (bus->queue, message);
g_mutex_unlock (bus->queue_lock);
- if (need_write) {
- c = 'p';
- errno = EAGAIN;
- while (write_ret == -1) {
- switch (errno) {
- case EAGAIN:
- case EINTR:
- break;
- default:
- perror ("gst_bus_post: could not write to fd");
- return FALSE;
- }
- write_ret = write (bus->control_socket[1], &c, 1);
- }
- }
+ /* FIXME cannot assume the source is only in the default context */
+ g_main_context_wakeup (NULL);
/* now block till the message is freed */
g_cond_wait (cond, lock);
gst_bus_pop (GstBus * bus)
{
GstMessage *message;
- gboolean needs_read = FALSE;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_mutex_lock (bus->queue_lock);
message = g_queue_pop_head (bus->queue);
- if (message && g_queue_get_length (bus->queue) == 0)
- needs_read = TRUE;
g_mutex_unlock (bus->queue_lock);
- if (needs_read) {
- gchar c;
- ssize_t read_ret = -1;
-
- /* the char in the fd is essentially just a way to wake us up. read it off so
- we're not woken up again. */
- errno = EAGAIN;
- while (read_ret == -1) {
- switch (errno) {
- case EAGAIN:
- case EINTR:
- break;
- default:
- perror ("gst_bus_pop: could not read from fd");
- return NULL;
- }
- read_ret = read (bus->control_socket[0], &c, 1);
- }
- }
-
return message;
}
GST_UNLOCK (bus);
}
-/**
- * gst_bus_create_watch:
- * @bus: a #GstBus to create the watch for
- *
- * Create watch for this bus.
- *
- * Returns: A #GSource that can be added to a mainloop.
+/* GSource for the bus
*/
-GSource *
-gst_bus_create_watch (GstBus * bus)
+typedef struct
{
- GSource *source;
-
- g_return_val_if_fail (GST_IS_BUS (bus), NULL);
-
- /* FIXME, we need to ref the bus and unref it when the source
- * is destroyed */
- source = g_io_create_watch (bus->io_channel, G_IO_IN);
+ GSource source;
+ GstBus *bus;
+} GstBusSource;
- return source;
+gboolean
+gst_bus_source_prepare (GSource * source, gint * timeout)
+{
+ *timeout = -1;
+ return gst_bus_have_pending (((GstBusSource *) source)->bus);
}
-typedef struct
+gboolean
+gst_bus_source_check (GSource * source)
{
- GSource *source;
- GstBus *bus;
- gint priority;
- GstBusHandler handler;
- gpointer user_data;
- GDestroyNotify notify;
-} GstBusWatch;
+ return gst_bus_have_pending (((GstBusSource *) source)->bus);
+}
-static gboolean
-bus_watch_callback (GIOChannel * channel, GIOCondition cond,
- GstBusWatch * watch)
+gboolean
+gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
+ gpointer user_data)
{
+ GstBusHandler handler = (GstBusHandler) callback;
+ GstBusSource *bsource = (GstBusSource *) source;
GstMessage *message;
gboolean needs_pop = TRUE;
- g_return_val_if_fail (GST_IS_BUS (watch->bus), FALSE);
+ g_return_val_if_fail (GST_IS_BUS (bsource->bus), FALSE);
- message = gst_bus_peek (watch->bus);
+ message = gst_bus_peek (bsource->bus);
g_return_val_if_fail (message != NULL, TRUE);
- if (watch->handler)
- needs_pop = watch->handler (watch->bus, message, watch->user_data);
+ if (!handler) {
+ g_warning ("GstBus watch dispatched without callback\n"
+ "You must call g_source_connect().");
+ return FALSE;
+ }
+
+ needs_pop = handler (bsource->bus, message, user_data);
if (needs_pop)
- gst_message_unref (gst_bus_pop (watch->bus));
+ gst_message_unref (gst_bus_pop (bsource->bus));
return TRUE;
}
-static void
-bus_watch_destroy (GstBusWatch * watch)
+void
+gst_bus_source_finalize (GSource * source)
{
- if (watch->notify) {
- watch->notify (watch->user_data);
- }
- gst_object_unref (GST_OBJECT_CAST (watch->bus));
- g_free (watch);
+ GstBusSource *bsource = (GstBusSource *) source;
+
+ gst_object_unref (GST_OBJECT_CAST (bsource->bus));
+}
+
+static GSourceFuncs gst_bus_source_funcs = {
+ gst_bus_source_prepare,
+ gst_bus_source_check,
+ gst_bus_source_dispatch,
+ gst_bus_source_finalize
+};
+
+/**
+ * gst_bus_create_watch:
+ * @bus: a #GstBus to create the watch for
+ *
+ * Create watch for this bus.
+ *
+ * Returns: A #GSource that can be added to a mainloop.
+ */
+GSource *
+gst_bus_create_watch (GstBus * bus)
+{
+ GstBusSource *source;
+
+ g_return_val_if_fail (GST_IS_BUS (bus), NULL);
+
+ source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
+ sizeof (GstBusSource));
+ gst_object_ref (GST_OBJECT_CAST (bus));
+ source->bus = bus;
+
+ return (GSource *) source;
}
/**
GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
{
guint id;
- GstBusWatch *watch;
+ GSource *source;
g_return_val_if_fail (GST_IS_BUS (bus), 0);
- watch = g_new (GstBusWatch, 1);
-
- gst_object_ref (GST_OBJECT_CAST (bus));
- watch->source = gst_bus_create_watch (bus);
- watch->bus = bus;
- watch->priority = priority;
- watch->handler = handler;
- watch->user_data = user_data;
- watch->notify = notify;
+ source = gst_bus_create_watch (bus);
if (priority != G_PRIORITY_DEFAULT)
- g_source_set_priority (watch->source, priority);
+ g_source_set_priority (source, priority);
- g_source_set_callback (watch->source, (GSourceFunc) bus_watch_callback,
- watch, (GDestroyNotify) bus_watch_destroy);
+ g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
- id = g_source_attach (watch->source, NULL);
- g_source_unref (watch->source);
+ id = g_source_attach (source, NULL);
+ g_source_unref (source);
return id;
}