if (flushing) {
GST_FLAG_SET (bus, GST_BUS_FLUSHING);
- GST_DEBUG ("set bus flushing");
+ GST_DEBUG_OBJECT (bus, "set bus flushing");
while ((message = gst_bus_pop (bus)))
gst_message_unref (message);
} else {
- GST_DEBUG ("unset bus flushing");
+ GST_DEBUG_OBJECT (bus, "unset bus flushing");
GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
}
message = g_queue_pop_head (bus->queue);
g_mutex_unlock (bus->queue_lock);
- GST_DEBUG ("pop on bus, got message %p", message);
+ GST_DEBUG_OBJECT (bus, "pop on bus, got message %p", message);
return message;
}
* gst_bus_peek:
* @bus: a #GstBus
*
- * Peek the message on the top of the bus' queue. The bus maintains ownership of
- * the message, and the message will remain on the bus' message queue.
+ * Peek the message on the top of the bus' queue. The message will remain
+ * on the bus' message queue. A reference is returned, and needs to be freed
+ * by the caller.
*
* Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
*
* MT safe.
*/
-/* FIXME, dangerous as the bus could be set to flushing while the app holds
- * a ref to the message */
GstMessage *
gst_bus_peek (GstBus * bus)
{
g_mutex_lock (bus->queue_lock);
message = g_queue_peek_head (bus->queue);
+ if (message)
+ gst_message_ref (message);
g_mutex_unlock (bus->queue_lock);
- GST_DEBUG ("peek on bus, got message %p", message);
+ GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
return message;
}
message = gst_bus_peek (bus);
- GST_DEBUG ("have message %p", message);
+ GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message);
g_return_val_if_fail (message != NULL, TRUE);
if (!handler)
goto no_handler;
- GST_DEBUG ("calling dispatch with %p", message);
+ GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
needs_pop = handler (bus, message, user_data);
+ gst_message_unref (message);
- GST_DEBUG ("handler returns %d", needs_pop);
+ GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop);
if (needs_pop) {
message = gst_bus_pop (bus);
if (message) {
{
g_warning ("GstBus watch dispatched without callback\n"
"You must call g_source_connect().");
+ gst_message_unref (message);
return FALSE;
}
}
id = g_source_attach (source, NULL);
g_source_unref (source);
+ GST_DEBUG_OBJECT (bus, "New source %p\n", source);
return id;
}
{
GMainLoop *loop;
guint timeout_id;
+ gboolean source_running;
GstMessageType events;
GstMessageType revent;
} GstBusPollData;
static gboolean
poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
{
+ if (!g_main_loop_is_running (poll_data->loop))
+ return FALSE;
+
if (GST_MESSAGE_TYPE (message) & poll_data->events) {
poll_data->revent = GST_MESSAGE_TYPE (message);
- if (g_main_loop_is_running (poll_data->loop))
- g_main_loop_quit (poll_data->loop);
+ g_main_loop_quit (poll_data->loop);
+
/* keep the message on the queue */
return FALSE;
} else {
static gboolean
poll_timeout (GstBusPollData * poll_data)
{
- poll_data->timeout_id = 0;
g_main_loop_quit (poll_data->loop);
+
/* returning FALSE will remove the source id */
return FALSE;
}
+static void
+poll_destroy (GstBusPollData * poll_data)
+{
+ poll_data->source_running = FALSE;
+ if (!poll_data->timeout_id) {
+ g_main_loop_unref (poll_data->loop);
+ g_free (poll_data);
+ }
+}
+
+static void
+poll_destroy_timeout (GstBusPollData * poll_data)
+{
+ poll_data->timeout_id = 0;
+ if (!poll_data->source_running) {
+ g_main_loop_unref (poll_data->loop);
+ g_free (poll_data);
+ }
+}
+
/**
* gst_bus_poll:
* @bus: a #GstBus
GstMessageType
gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
{
- GstBusPollData poll_data;
+ GstBusPollData *poll_data;
GstMessageType ret;
guint id;
+ poll_data = g_new0 (GstBusPollData, 1);
+ g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN);
+
+ poll_data->source_running = TRUE;
+ poll_data->loop = g_main_loop_new (NULL, FALSE);
+ poll_data->events = events;
+ poll_data->revent = GST_MESSAGE_UNKNOWN;
+
if (timeout >= 0)
- poll_data.timeout_id = g_timeout_add (timeout / GST_MSECOND,
- (GSourceFunc) poll_timeout, &poll_data);
+ poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
+ timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
+ (GDestroyNotify) poll_destroy_timeout);
else
- poll_data.timeout_id = 0;
+ poll_data->timeout_id = 0;
- poll_data.loop = g_main_loop_new (NULL, FALSE);
- poll_data.events = events;
- poll_data.revent = GST_MESSAGE_UNKNOWN;
+ id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE,
+ (GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy);
+ g_main_loop_run (poll_data->loop);
+ ret = poll_data->revent;
- id = gst_bus_add_watch (bus, (GstBusHandler) poll_handler, &poll_data);
- g_main_loop_run (poll_data.loop);
- g_source_remove (id);
+ if (poll_data->timeout_id)
+ g_source_remove (poll_data->timeout_id);
- ret = poll_data.revent;
+ /* poll_data may get destroyed at any time now */
+ g_source_remove (id);
- if (poll_data.timeout_id)
- g_source_remove (poll_data.timeout_id);
- g_main_loop_unref (poll_data.loop);
+ GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret);
return ret;
}