#include <sys/types.h>
#include "gstinfo.h"
-#include "gstpoll.h"
#include "gstbus.h"
LAST_SIGNAL
};
-#define DEFAULT_ENABLE_ASYNC (TRUE)
-
-enum
-{
- PROP_0,
- PROP_ENABLE_ASYNC
-};
-
static void gst_bus_dispose (GObject * object);
+static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx);
+
static GstObjectClass *parent_class = NULL;
static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
struct _GstBusPrivate
{
guint num_sync_message_emitters;
+ GCond *queue_cond;
GSource *watch_id;
-
- gboolean enable_async;
- GstPoll *poll;
- GPollFD pollfd;
+ GMainContext *main_context;
};
G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
}
static void
-gst_bus_set_property (GObject * object,
- guint prop_id, const GValue * value, GParamSpec * pspec)
-{
- GstBus *bus = GST_BUS_CAST (object);
-
- switch (prop_id) {
- case PROP_ENABLE_ASYNC:
- bus->priv->enable_async = g_value_get_boolean (value);
- break;
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
- break;
- }
-}
-
-static void
-gst_bus_constructed (GObject * object)
-{
- GstBus *bus = GST_BUS_CAST (object);
-
- if (bus->priv->enable_async) {
- bus->priv->poll = gst_poll_new_timer ();
- gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
- }
-}
-
-static void
gst_bus_class_init (GstBusClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
parent_class = g_type_class_peek_parent (klass);
gobject_class->dispose = gst_bus_dispose;
- gobject_class->set_property = gst_bus_set_property;
- gobject_class->constructed = gst_bus_constructed;
-
- /* GstBus:enable-async:
- *
- * Enable async message delivery support for bus watches,
- * gst_bus_pop() and similar API. Without this only the
- * synchronous message handlers are called.
- *
- * This property is used to create the child element buses
- * in #GstBin.
- *
- * Since: 0.10.33
- */
- g_object_class_install_property (gobject_class, PROP_ENABLE_ASYNC,
- g_param_spec_boolean ("enable-async", "Enable Async",
- "Enable async message delivery for bus watches and gst_bus_pop()",
- DEFAULT_ENABLE_ASYNC,
- G_PARAM_CONSTRUCT_ONLY | G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
/**
* GstBus::sync-message:
static void
gst_bus_init (GstBus * bus)
{
- bus->queue = gst_atomic_queue_new (32);
+ bus->queue = g_queue_new ();
bus->queue_lock = g_mutex_new ();
bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
- bus->priv->enable_async = DEFAULT_ENABLE_ASYNC;
+ bus->priv->queue_cond = g_cond_new ();
GST_DEBUG_OBJECT (bus, "created");
}
g_mutex_lock (bus->queue_lock);
do {
- message = gst_atomic_queue_pop (bus->queue);
+ message = g_queue_pop_head (bus->queue);
if (message)
gst_message_unref (message);
} while (message != NULL);
- gst_atomic_queue_unref (bus->queue);
+ g_queue_free (bus->queue);
bus->queue = NULL;
g_mutex_unlock (bus->queue_lock);
g_mutex_free (bus->queue_lock);
bus->queue_lock = NULL;
+ g_cond_free (bus->priv->queue_cond);
+ bus->priv->queue_cond = NULL;
+ }
- if (bus->priv->poll)
- gst_poll_free (bus->priv->poll);
- bus->priv->poll = NULL;
+ if (bus->priv->main_context) {
+ g_main_context_unref (bus->priv->main_context);
+ bus->priv->main_context = NULL;
}
G_OBJECT_CLASS (parent_class)->dispose (object);
}
+static void
+gst_bus_wakeup_main_context (GstBus * bus)
+{
+ GMainContext *ctx;
+
+ GST_OBJECT_LOCK (bus);
+ if ((ctx = bus->priv->main_context))
+ g_main_context_ref (ctx);
+ GST_OBJECT_UNLOCK (bus);
+
+ g_main_context_wakeup (ctx);
+
+ if (ctx)
+ g_main_context_unref (ctx);
+}
+
+static void
+gst_bus_set_main_context (GstBus * bus, GMainContext * ctx)
+{
+ GST_OBJECT_LOCK (bus);
+
+ if (bus->priv->main_context != NULL) {
+ g_main_context_unref (bus->priv->main_context);
+ bus->priv->main_context = NULL;
+ }
+
+ if (ctx != NULL) {
+ bus->priv->main_context = g_main_context_ref (ctx);
+ }
+
+ GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p",
+ ctx, g_main_context_default ());
+
+ GST_OBJECT_UNLOCK (bus);
+}
+
/**
* gst_bus_new:
*
&& handler != gst_bus_sync_signal_handler)
gst_bus_sync_signal_handler (bus, message, NULL);
- /* If this is a bus without async message delivery
- * always drop the message */
- if (!bus->priv->poll)
- reply = GST_BUS_DROP;
-
/* now see what we should do with the message */
switch (reply) {
case GST_BUS_DROP:
case GST_BUS_PASS:
/* pass the message to the async queue, refcount passed in the queue */
GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
- gst_atomic_queue_push (bus->queue, message);
- gst_poll_write_control (bus->priv->poll);
+ g_mutex_lock (bus->queue_lock);
+ g_queue_push_tail (bus->queue, message);
+ g_cond_broadcast (bus->priv->queue_cond);
+ g_mutex_unlock (bus->queue_lock);
GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
+ gst_bus_wakeup_main_context (bus);
+
break;
case GST_BUS_ASYNC:
{
* queue. When the message is handled by the app and destroyed,
* the cond will be signalled and we can continue */
g_mutex_lock (lock);
+ g_mutex_lock (bus->queue_lock);
+ g_queue_push_tail (bus->queue, message);
+ g_cond_broadcast (bus->priv->queue_cond);
+ g_mutex_unlock (bus->queue_lock);
- gst_atomic_queue_push (bus->queue, message);
- gst_poll_write_control (bus->priv->poll);
+ gst_bus_wakeup_main_context (bus);
/* now block till the message is freed */
g_cond_wait (cond, lock);
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
+ g_mutex_lock (bus->queue_lock);
/* see if there is a message on the bus */
- result = gst_atomic_queue_length (bus->queue) != 0;
+ result = !g_queue_is_empty (bus->queue);
+ g_mutex_unlock (bus->queue_lock);
return result;
}
GstMessageType types)
{
GstMessage *message;
- GTimeVal now, then;
+ GTimeVal *timeval, abstimeout;
gboolean first_round = TRUE;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_return_val_if_fail (types != 0, NULL);
- g_return_val_if_fail (timeout == 0 || bus->priv->poll != NULL, NULL);
g_mutex_lock (bus->queue_lock);
while (TRUE) {
- gint ret;
+ GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue));
- GST_LOG_OBJECT (bus, "have %d messages",
- gst_atomic_queue_length (bus->queue));
-
- while ((message = gst_atomic_queue_pop (bus->queue))) {
- if (bus->priv->poll)
- gst_poll_read_control (bus->priv->poll);
+ while ((message = g_queue_pop_head (bus->queue))) {
GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u",
message, GST_MESSAGE_TYPE_NAME (message), (guint) types);
if ((GST_MESSAGE_TYPE (message) & types) != 0) {
if (timeout == 0)
break;
- else if (timeout != GST_CLOCK_TIME_NONE) {
- if (first_round) {
- g_get_current_time (&then);
- first_round = FALSE;
- } else {
- GstClockTime elapsed;
-
- g_get_current_time (&now);
-
- elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then);
- if (timeout > elapsed)
- timeout -= elapsed;
- else
- timeout = 0;
- }
+ if (timeout == GST_CLOCK_TIME_NONE) {
+ /* wait forever */
+ timeval = NULL;
+ } else if (first_round) {
+ glong add = timeout / 1000;
+
+ if (add == 0)
+ /* no need to wait */
+ break;
+
+ /* make timeout absolute */
+ g_get_current_time (&abstimeout);
+ g_time_val_add (&abstimeout, add);
+ timeval = &abstimeout;
+ first_round = FALSE;
+ GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
+ } else {
+ /* calculated the absolute end time already, no need to do it again */
+ GST_DEBUG_OBJECT (bus, "blocking for message, again");
+ timeval = &abstimeout; /* fool compiler */
}
-
- /* only here in timeout case */
- g_assert (bus->priv->poll);
- g_mutex_unlock (bus->queue_lock);
- ret = gst_poll_wait (bus->priv->poll, timeout);
- g_mutex_lock (bus->queue_lock);
-
- if (ret == 0) {
+ if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
GST_INFO_OBJECT (bus, "timed out, breaking loop");
break;
} else {
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_mutex_lock (bus->queue_lock);
- message = gst_atomic_queue_peek (bus->queue);
+ message = g_queue_peek_head (bus->queue);
if (message)
gst_message_ref (message);
g_mutex_unlock (bus->queue_lock);
{
GSource source;
GstBus *bus;
+ gboolean inited;
} GstBusSource;
static gboolean
gst_bus_source_prepare (GSource * source, gint * timeout)
{
+ GstBusSource *bsrc = (GstBusSource *) source;
+
+ /* we do this here now that we know that we're attached to a main context
+ * (we don't support detaching a source from a main context and then
+ * re-attaching it to a different main context) */
+ if (G_UNLIKELY (!bsrc->inited)) {
+ gst_bus_set_main_context (bsrc->bus, g_source_get_context (source));
+ bsrc->inited = TRUE;
+ }
+
*timeout = -1;
- return FALSE;
+ return gst_bus_have_pending (bsrc->bus);
}
static gboolean
{
GstBusSource *bsrc = (GstBusSource *) source;
- return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR);
+ return gst_bus_have_pending (bsrc->bus);
}
static gboolean
bus->priv->watch_id = NULL;
GST_OBJECT_UNLOCK (bus);
+ gst_bus_set_main_context (bsource->bus, NULL);
gst_object_unref (bsource->bus);
bsource->bus = NULL;
}
GstBusSource *source;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
- g_return_val_if_fail (bus->priv->poll != NULL, NULL);
source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
sizeof (GstBusSource));
source->bus = gst_object_ref (bus);
- g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
+ source->inited = FALSE;
return (GSource *) source;
}