/**
* SECTION:gstbus
+ * @title: GstBus
* @short_description: Asynchronous message bus subsystem
* @see_also: #GstMessage, #GstElement
*
#include "gstbus.h"
#include "glib-compat-private.h"
+#ifdef G_OS_WIN32
+# ifndef EWOULDBLOCK
+# define EWOULDBLOCK EAGAIN /* This is just to placate gcc */
+# endif
+#endif /* G_OS_WIN32 */
+
#define GST_CAT_DEFAULT GST_CAT_BUS
/* bus signals */
enum
bus->priv->poll = gst_poll_new_timer ();
gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
}
+
+ G_OBJECT_CLASS (gst_bus_parent_class)->constructed (object);
}
static void
is_flushing:
{
GST_DEBUG_OBJECT (bus, "bus is flushing");
- gst_message_unref (message);
GST_OBJECT_UNLOCK (bus);
+ gst_message_unref (message);
return FALSE;
}
gst_bus_set_flushing (GstBus * bus, gboolean flushing)
{
GstMessage *message;
+ GList *message_list = NULL;
+
+ g_return_if_fail (GST_IS_BUS (bus));
GST_OBJECT_LOCK (bus);
GST_DEBUG_OBJECT (bus, "set bus flushing");
while ((message = gst_bus_pop (bus)))
- gst_message_unref (message);
+ message_list = g_list_prepend (message_list, message);
} else {
GST_DEBUG_OBJECT (bus, "unset bus flushing");
GST_OBJECT_FLAG_UNSET (bus, GST_BUS_FLUSHING);
}
GST_OBJECT_UNLOCK (bus);
+
+ g_list_free_full (message_list, (GDestroyNotify) gst_message_unref);
}
/**
gst_atomic_queue_length (bus->priv->queue));
while ((message = gst_atomic_queue_pop (bus->priv->queue))) {
- if (bus->priv->poll)
- gst_poll_read_control (bus->priv->poll);
+ if (bus->priv->poll) {
+ while (!gst_poll_read_control (bus->priv->poll)) {
+ if (errno == EWOULDBLOCK) {
+ /* Retry, this can happen if pushing to the queue has finished,
+ * popping here succeeded but writing control did not finish
+ * before we got to this line. */
+ /* Give other threads the chance to do something */
+ g_thread_yield ();
+ continue;
+ } else {
+ /* This is a real error and means that either the bus is in an
+ * inconsistent state, or the GstPoll is invalid. GstPoll already
+ * prints a critical warning about this, no need to do that again
+ * ourselves */
+ break;
+ }
+ }
+ }
GST_DEBUG_OBJECT (bus, "got message %p, %s from %s, type mask is %u",
message, GST_MESSAGE_TYPE_NAME (message),
}
source = gst_bus_create_watch (bus);
+ if (!source) {
+ g_critical ("Creating bus watch failed");
+ return 0;
+ }
if (priority != G_PRIORITY_DEFAULT)
g_source_set_priority (source, priority);
* There can only be a single bus watch per bus, you must remove it before you
* can set a new one.
*
+ * The bus watch will only work if a GLib main loop is being run.
+ *
* When @func is called, the message belongs to the caller; if you want to
* keep a copy of it, call gst_message_ref() before leaving @func.
*
* There can only be a single bus watch per bus, you must remove it before you
* can set a new one.
*
+ * The bus watch will only work if a GLib main loop is being run.
+ *
* The watch can be removed using gst_bus_remove_watch() or by returning %FALSE
* from @func. If the watch was added to the default main context it is also
* possible to remove the watch using g_source_remove().
GstMessage *ret;
gulong id;
+ g_return_val_if_fail (GST_IS_BUS (bus), NULL);
+
poll_data = g_slice_new (GstBusPollData);
poll_data->source_running = TRUE;
poll_data->loop = g_main_loop_new (NULL, FALSE);