* Note that a #GstPipeline will set its bus into flushing state when changing
* from READY to NULL state.
*
- * Last reviewed on 2006-03-12 (0.10.5)
+ * Last reviewed on 2012-03-28 (0.11.3)
*/
#include "gst_private.h"
#endif
#include <sys/types.h>
+#include "gstatomicqueue.h"
#include "gstinfo.h"
#include "gstpoll.h"
#include "gstbus.h"
+#include "glib-compat-private.h"
#define GST_CAT_DEFAULT GST_CAT_BUS
/* bus signals */
static void gst_bus_dispose (GObject * object);
-static GstObjectClass *parent_class = NULL;
static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
struct _GstBusPrivate
{
+ GstAtomicQueue *queue;
+ GMutex queue_lock;
+
+ GstBusSyncHandler sync_handler;
+ gpointer sync_handler_data;
+
+ guint signal_watch_id;
+ guint num_signal_watchers;
+
guint num_sync_message_emitters;
GSource *watch_id;
GPollFD pollfd;
};
+#define gst_bus_parent_class parent_class
G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
static void
{
GObjectClass *gobject_class = (GObjectClass *) klass;
- parent_class = g_type_class_peek_parent (klass);
-
gobject_class->dispose = gst_bus_dispose;
gobject_class->set_property = gst_bus_set_property;
gobject_class->constructed = gst_bus_constructed;
g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
- g_cclosure_marshal_VOID__BOXED, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
+ g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
/**
* GstBus::message:
g_signal_new ("message", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
- g_cclosure_marshal_VOID__BOXED, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
+ g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
g_type_class_add_private (klass, sizeof (GstBusPrivate));
}
static void
gst_bus_init (GstBus * bus)
{
- bus->queue = gst_atomic_queue_new (32);
- bus->queue_lock = g_mutex_new ();
-
bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
bus->priv->enable_async = DEFAULT_ENABLE_ASYNC;
+ g_mutex_init (&bus->priv->queue_lock);
+ bus->priv->queue = gst_atomic_queue_new (32);
GST_DEBUG_OBJECT (bus, "created");
}
{
GstBus *bus = GST_BUS (object);
- if (bus->queue) {
+ if (bus->priv->queue) {
GstMessage *message;
- g_mutex_lock (bus->queue_lock);
+ g_mutex_lock (&bus->priv->queue_lock);
do {
- message = gst_atomic_queue_pop (bus->queue);
+ message = gst_atomic_queue_pop (bus->priv->queue);
if (message)
gst_message_unref (message);
} while (message != NULL);
- gst_atomic_queue_unref (bus->queue);
- bus->queue = NULL;
- g_mutex_unlock (bus->queue_lock);
- g_mutex_free (bus->queue_lock);
- bus->queue_lock = NULL;
+ gst_atomic_queue_unref (bus->priv->queue);
+ bus->priv->queue = NULL;
+ g_mutex_unlock (&bus->priv->queue_lock);
+ g_mutex_clear (&bus->priv->queue_lock);
if (bus->priv->poll)
gst_poll_free (bus->priv->poll);
if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
goto is_flushing;
- handler = bus->sync_handler;
- handler_data = bus->sync_handler_data;
+ handler = bus->priv->sync_handler;
+ handler_data = bus->priv->sync_handler_data;
emit_sync_message = bus->priv->num_sync_message_emitters > 0;
GST_OBJECT_UNLOCK (bus);
case GST_BUS_PASS:
/* pass the message to the async queue, refcount passed in the queue */
GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
- gst_atomic_queue_push (bus->queue, message);
+ gst_atomic_queue_push (bus->priv->queue, message);
gst_poll_write_control (bus->priv->poll);
GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
{
/* async delivery, we need a mutex and a cond to block
* on */
- GMutex *lock = g_mutex_new ();
- GCond *cond = g_cond_new ();
+ GCond *cond = GST_MESSAGE_GET_COND (message);
+ GMutex *lock = GST_MESSAGE_GET_LOCK (message);
- GST_MESSAGE_COND (message) = cond;
- GST_MESSAGE_GET_LOCK (message) = lock;
+ g_cond_init (cond);
+ g_mutex_init (lock);
GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
* the cond will be signalled and we can continue */
g_mutex_lock (lock);
- gst_atomic_queue_push (bus->queue, message);
+ gst_atomic_queue_push (bus->priv->queue, message);
gst_poll_write_control (bus->priv->poll);
/* now block till the message is freed */
GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
- g_mutex_free (lock);
- g_cond_free (cond);
+ g_mutex_clear (lock);
+ g_cond_clear (cond);
break;
}
default:
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
/* see if there is a message on the bus */
- result = gst_atomic_queue_length (bus->queue) != 0;
+ result = gst_atomic_queue_length (bus->priv->queue) != 0;
return result;
}
g_return_val_if_fail (types != 0, NULL);
g_return_val_if_fail (timeout == 0 || bus->priv->poll != NULL, NULL);
- g_mutex_lock (bus->queue_lock);
+ g_mutex_lock (&bus->priv->queue_lock);
while (TRUE) {
gint ret;
GST_LOG_OBJECT (bus, "have %d messages",
- gst_atomic_queue_length (bus->queue));
+ gst_atomic_queue_length (bus->priv->queue));
- while ((message = gst_atomic_queue_pop (bus->queue))) {
+ while ((message = gst_atomic_queue_pop (bus->priv->queue))) {
if (bus->priv->poll)
gst_poll_read_control (bus->priv->poll);
- GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u",
- message, GST_MESSAGE_TYPE_NAME (message), (guint) types);
+
+ GST_DEBUG_OBJECT (bus, "got message %p, %s from %s, type mask is %u",
+ message, GST_MESSAGE_TYPE_NAME (message),
+ GST_MESSAGE_SRC_NAME (message), (guint) types);
if ((GST_MESSAGE_TYPE (message) & types) != 0) {
/* exit the loop, we have a message */
goto beach;
/* only here in timeout case */
g_assert (bus->priv->poll);
- g_mutex_unlock (bus->queue_lock);
+ g_mutex_unlock (&bus->priv->queue_lock);
ret = gst_poll_wait (bus->priv->poll, timeout - elapsed);
- g_mutex_lock (bus->queue_lock);
+ g_mutex_lock (&bus->priv->queue_lock);
if (ret == 0) {
GST_INFO_OBJECT (bus, "timed out, breaking loop");
beach:
- g_mutex_unlock (bus->queue_lock);
+ g_mutex_unlock (&bus->priv->queue_lock);
return message;
}
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
- g_mutex_lock (bus->queue_lock);
- message = gst_atomic_queue_peek (bus->queue);
+ g_mutex_lock (&bus->priv->queue_lock);
+ message = gst_atomic_queue_peek (bus->priv->queue);
if (message)
gst_message_ref (message);
- g_mutex_unlock (bus->queue_lock);
+ g_mutex_unlock (&bus->priv->queue_lock);
GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
/* Assert if the user attempts to replace an existing sync_handler,
* other than to clear it */
- if (func != NULL && bus->sync_handler != NULL)
+ if (func != NULL && bus->priv->sync_handler != NULL)
goto no_replace;
- bus->sync_handler = func;
- bus->sync_handler_data = data;
+ bus->priv->sync_handler = func;
+ bus->priv->sync_handler_data = data;
GST_OBJECT_UNLOCK (bus);
return;
if (!handler)
goto no_handler;
- GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
+ GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %" GST_PTR_FORMAT,
+ source, message);
keep = handler (bus, message, user_data);
gst_message_unref (message);
source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
sizeof (GstBusSource));
+
+ g_source_set_name ((GSource *) source, "GStreamer message bus watch");
+
source->bus = gst_object_ref (bus);
g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
* from @func.
*
* Returns: The event source id.
- *
+ * Rename to: gst_bus_add_watch
* MT safe.
*/
guint
}
/**
- * gst_bus_add_watch:
+ * gst_bus_add_watch: (skip)
* @bus: a #GstBus to create the watch for
* @func: A function to call when a message is received.
* @user_data: user data passed to @func.
* @bus: a #GstBus
* @events: a mask of #GstMessageType, representing the set of message types to
* poll for.
- * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll
+ * @timeout: the poll timeout, as a #GstClockTime, or #GST_CLOCK_TIME_NONE to poll
* indefinitely.
*
* Poll the bus for messages. Will block while waiting for messages to come.
* unreffed with gst_message_unref() after usage.
*/
GstMessage *
-gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
+gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTime timeout)
{
GstBusPollData *poll_data;
GstMessage *ret;
poll_data->events = events;
poll_data->message = NULL;
- if (timeout >= 0)
+ if (timeout != GST_CLOCK_TIME_NONE)
poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
(GDestroyNotify) poll_destroy_timeout);
gst_bus_disable_sync_message_emission (GstBus * bus)
{
g_return_if_fail (GST_IS_BUS (bus));
- g_return_if_fail (bus->num_signal_watchers == 0);
+ g_return_if_fail (bus->priv->num_signal_watchers == 0);
GST_OBJECT_LOCK (bus);
bus->priv->num_sync_message_emitters--;
* responsible for calling gst_bus_remove_signal_watch() as many times as this
* function is called.
*
- * There can only be a single bus watch per bus, you most remove all signal watch
- * before you can set another type of watch.
+ * There can only be a single bus watch per bus, you must remove any signal
+ * watch before you can set another type of watch.
*
* MT safe.
*/
/* I know the callees don't take this lock, so go ahead and abuse it */
GST_OBJECT_LOCK (bus);
- if (bus->num_signal_watchers > 0)
+ if (bus->priv->num_signal_watchers > 0)
goto done;
/* this should not fail because the counter above takes care of it */
- g_assert (bus->signal_watch_id == 0);
+ g_assert (bus->priv->signal_watch_id == 0);
- bus->signal_watch_id =
+ bus->priv->signal_watch_id =
gst_bus_add_watch_full_unlocked (bus, priority, gst_bus_async_signal_func,
NULL, NULL);
- if (G_UNLIKELY (bus->signal_watch_id == 0))
+ if (G_UNLIKELY (bus->priv->signal_watch_id == 0))
goto add_failed;
done:
- bus->num_signal_watchers++;
+ bus->priv->num_signal_watchers++;
GST_OBJECT_UNLOCK (bus);
return;
/* I know the callees don't take this lock, so go ahead and abuse it */
GST_OBJECT_LOCK (bus);
- if (bus->num_signal_watchers == 0)
+ if (bus->priv->num_signal_watchers == 0)
goto error;
- bus->num_signal_watchers--;
+ bus->priv->num_signal_watchers--;
- if (bus->num_signal_watchers > 0)
+ if (bus->priv->num_signal_watchers > 0)
goto done;
- id = bus->signal_watch_id;
- bus->signal_watch_id = 0;
+ id = bus->priv->signal_watch_id;
+ bus->priv->signal_watch_id = 0;
GST_DEBUG_OBJECT (bus, "removing signal watch %u", id);