2 * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
4 * gstbus.c: GstBus subsystem
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 02111-1307, USA.
25 #include <sys/types.h>
26 #include <sys/socket.h>
28 #include "gst_private.h"
38 static void gst_bus_class_init (GstBusClass * klass);
39 static void gst_bus_init (GstBus * bus);
40 static void gst_bus_dispose (GObject * object);
42 static void gst_bus_set_property (GObject * object, guint prop_id,
43 const GValue * value, GParamSpec * pspec);
44 static void gst_bus_get_property (GObject * object, guint prop_id,
45 GValue * value, GParamSpec * pspec);
47 static GstObjectClass *parent_class = NULL;
49 /* static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; */
52 gst_bus_get_type (void)
54 static GType bus_type = 0;
57 static const GTypeInfo bus_info = {
61 (GClassInitFunc) gst_bus_class_init,
66 (GInstanceInitFunc) gst_bus_init,
70 bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
76 gst_bus_class_init (GstBusClass * klass)
78 GObjectClass *gobject_class;
79 GstObjectClass *gstobject_class;
81 gobject_class = (GObjectClass *) klass;
82 gstobject_class = (GstObjectClass *) klass;
84 parent_class = g_type_class_ref (GST_TYPE_OBJECT);
86 if (!g_thread_supported ())
89 gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
90 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
91 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
95 gst_bus_init (GstBus * bus)
97 bus->queue = g_queue_new ();
98 bus->queue_lock = g_mutex_new ();
100 GST_DEBUG_OBJECT (bus, "created");
106 gst_bus_dispose (GObject * object)
110 bus = GST_BUS (object);
113 g_mutex_lock (bus->queue_lock);
114 g_queue_free (bus->queue);
116 g_mutex_unlock (bus->queue_lock);
117 g_mutex_free (bus->queue_lock);
118 bus->queue_lock = NULL;
121 G_OBJECT_CLASS (parent_class)->dispose (object);
125 gst_bus_set_property (GObject * object, guint prop_id,
126 const GValue * value, GParamSpec * pspec)
130 bus = GST_BUS (object);
134 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
140 gst_bus_get_property (GObject * object, guint prop_id,
141 GValue * value, GParamSpec * pspec)
145 bus = GST_BUS (object);
149 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
159 result = g_object_new (gst_bus_get_type (), NULL);
166 * @bus: a #GstBus to post on
167 * @message: The #GstMessage to post
169 * Post a message on the given bus. Ownership of the message
170 * is taken by the bus.
172 * Returns: TRUE if the message could be posted.
177 gst_bus_post (GstBus * bus, GstMessage * message)
179 GstBusSyncReply reply = GST_BUS_PASS;
180 GstBusSyncHandler handler;
181 gpointer handler_data;
183 g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
184 g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
186 GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %d",
187 message, GST_MESSAGE_TYPE (message));
190 if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING)) {
191 GST_DEBUG_OBJECT (bus, "bus is flushing");
192 gst_message_unref (message);
197 handler = bus->sync_handler;
198 handler_data = bus->sync_handler_data;
202 /* first call the sync handler if it is installed */
204 reply = handler (bus, message, handler_data);
207 /* now see what we should do with the message */
210 /* drop the message */
211 GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message);
214 /* pass the message to the async queue, refcount passed in the queue */
215 GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
216 g_mutex_lock (bus->queue_lock);
217 g_queue_push_tail (bus->queue, message);
218 g_mutex_unlock (bus->queue_lock);
219 GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
221 /* FIXME cannot assume the source is only in the default context */
222 g_main_context_wakeup (NULL);
227 /* async delivery, we need a mutex and a cond to block
229 GMutex *lock = g_mutex_new ();
230 GCond *cond = g_cond_new ();
232 GST_MESSAGE_COND (message) = cond;
233 GST_MESSAGE_GET_LOCK (message) = lock;
235 GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
237 /* now we lock the message mutex, send the message to the async
238 * queue. When the message is handled by the app and destroyed,
239 * the cond will be signalled and we can continue */
241 g_mutex_lock (bus->queue_lock);
242 g_queue_push_tail (bus->queue, message);
243 g_mutex_unlock (bus->queue_lock);
245 /* FIXME cannot assume the source is only in the default context */
246 g_main_context_wakeup (NULL);
248 /* now block till the message is freed */
249 g_cond_wait (cond, lock);
250 g_mutex_unlock (lock);
252 GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
264 * gst_bus_have_pending:
265 * @bus: a #GstBus to check
267 * Check if there are pending messages on the bus that should be
270 * Returns: TRUE if there are messages on the bus to be handled.
275 gst_bus_have_pending (GstBus * bus)
279 g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
281 g_mutex_lock (bus->queue_lock);
282 length = g_queue_get_length (bus->queue);
283 g_mutex_unlock (bus->queue_lock);
289 * gst_bus_set_flushing:
291 * @flushing: whether or not to flush the bus
293 * If @flushing, flush out and unref any messages queued in the bus. Releases
294 * references to the message origin objects. Will flush future messages until
295 * gst_bus_set_flushing() sets @flushing to #FALSE.
300 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
307 GST_FLAG_SET (bus, GST_BUS_FLUSHING);
309 GST_DEBUG_OBJECT (bus, "set bus flushing");
311 while ((message = gst_bus_pop (bus)))
312 gst_message_unref (message);
314 GST_DEBUG_OBJECT (bus, "unset bus flushing");
315 GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
323 * @bus: a #GstBus to pop
325 * Get a message from the bus.
327 * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
332 gst_bus_pop (GstBus * bus)
336 g_return_val_if_fail (GST_IS_BUS (bus), NULL);
338 g_mutex_lock (bus->queue_lock);
339 message = g_queue_pop_head (bus->queue);
340 g_mutex_unlock (bus->queue_lock);
342 GST_DEBUG_OBJECT (bus, "pop on bus, got message %p", message);
351 * Peek the message on the top of the bus' queue. The message will remain
352 * on the bus' message queue. A reference is returned, and needs to be freed
355 * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
360 gst_bus_peek (GstBus * bus)
364 g_return_val_if_fail (GST_IS_BUS (bus), NULL);
366 g_mutex_lock (bus->queue_lock);
367 message = g_queue_peek_head (bus->queue);
369 gst_message_ref (message);
370 g_mutex_unlock (bus->queue_lock);
372 GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
378 * gst_bus_set_sync_handler:
379 * @bus: a #GstBus to install the handler on
380 * @func: The handler function to install
381 * @data: User data that will be sent to the handler function.
383 * Install a synchronous handler on the bus. The function will be called
384 * every time a new message is posted on the bus. Note that the function
385 * will be called in the same thread context as the posting object.
388 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
390 g_return_if_fail (GST_IS_BUS (bus));
393 bus->sync_handler = func;
394 bus->sync_handler_data = data;
398 /* GSource for the bus
407 gst_bus_source_prepare (GSource * source, gint * timeout)
410 return gst_bus_have_pending (((GstBusSource *) source)->bus);
414 gst_bus_source_check (GSource * source)
416 return gst_bus_have_pending (((GstBusSource *) source)->bus);
420 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
423 GstBusHandler handler = (GstBusHandler) callback;
424 GstBusSource *bsource = (GstBusSource *) source;
426 gboolean needs_pop = TRUE;
429 g_return_val_if_fail (bsource != NULL, FALSE);
433 g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
435 message = gst_bus_peek (bus);
437 GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message);
439 g_return_val_if_fail (message != NULL, TRUE);
444 GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
446 needs_pop = handler (bus, message, user_data);
447 gst_message_unref (message);
449 GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop);
451 message = gst_bus_pop (bus);
453 gst_message_unref (message);
455 /* after executing the handler, the app could have disposed
456 * the pipeline and set the bus to flushing. It is possible
457 * then that there are no more messages on the bus. this is
459 GST_DEBUG ("handler requested pop but no message on the bus");
466 g_warning ("GstBus watch dispatched without callback\n"
467 "You must call g_source_connect().");
468 gst_message_unref (message);
474 gst_bus_source_finalize (GSource * source)
476 GstBusSource *bsource = (GstBusSource *) source;
478 gst_object_unref (bsource->bus);
482 static GSourceFuncs gst_bus_source_funcs = {
483 gst_bus_source_prepare,
484 gst_bus_source_check,
485 gst_bus_source_dispatch,
486 gst_bus_source_finalize
490 * gst_bus_create_watch:
491 * @bus: a #GstBus to create the watch for
493 * Create watch for this bus.
495 * Returns: A #GSource that can be added to a mainloop.
498 gst_bus_create_watch (GstBus * bus)
500 GstBusSource *source;
502 g_return_val_if_fail (GST_IS_BUS (bus), NULL);
504 source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
505 sizeof (GstBusSource));
506 gst_object_ref (bus);
509 return (GSource *) source;
513 * gst_bus_add_watch_full:
514 * @bus: a #GstBus to create the watch for.
515 * @priority: The priority of the watch.
516 * @handler: A function to call when a message is received.
517 * @user_data: user data passed to @handler.
518 * @notify: the function to call when the source is removed.
520 * Adds the bus to the mainloop with the given priority. If the handler returns
521 * TRUE, the message will then be popped off the queue. When the handler is
522 * called, the message belongs to the caller; if you want to keep a copy of it,
523 * call gst_message_ref before leaving the handler.
525 * Returns: The event source id.
530 gst_bus_add_watch_full (GstBus * bus, gint priority,
531 GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
536 g_return_val_if_fail (GST_IS_BUS (bus), 0);
538 source = gst_bus_create_watch (bus);
540 if (priority != G_PRIORITY_DEFAULT)
541 g_source_set_priority (source, priority);
543 g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
545 id = g_source_attach (source, NULL);
546 g_source_unref (source);
548 GST_DEBUG_OBJECT (bus, "New source %p", source);
554 * @bus: a #GstBus to create the watch for
555 * @handler: A function to call when a message is received.
556 * @user_data: user data passed to @handler.
558 * Adds the bus to the mainloop with the default priority.
560 * Returns: The event source id.
565 gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data)
567 return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data,
575 gboolean source_running;
576 GstMessageType events;
577 GstMessageType revent;
581 poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
583 if (!g_main_loop_is_running (poll_data->loop))
586 if (GST_MESSAGE_TYPE (message) & poll_data->events) {
587 poll_data->revent = GST_MESSAGE_TYPE (message);
588 g_main_loop_quit (poll_data->loop);
590 /* keep the message on the queue */
593 /* pop and unref the message */
599 poll_timeout (GstBusPollData * poll_data)
601 g_main_loop_quit (poll_data->loop);
603 /* returning FALSE will remove the source id */
608 poll_destroy (GstBusPollData * poll_data)
610 poll_data->source_running = FALSE;
611 if (!poll_data->timeout_id) {
612 g_main_loop_unref (poll_data->loop);
618 poll_destroy_timeout (GstBusPollData * poll_data)
620 poll_data->timeout_id = 0;
621 if (!poll_data->source_running) {
622 g_main_loop_unref (poll_data->loop);
630 * @events: a mask of #GstMessageType, representing the set of message types to
632 * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
634 * Poll the bus for events. Will block while waiting for events to come. You can
635 * specify a maximum time to poll with the @timeout parameter. If @timeout is
636 * negative, this function will block indefinitely.
638 * This function will enter the default mainloop while polling.
640 * Returns: The type of the message that was received, or GST_MESSAGE_UNKNOWN if
641 * the poll timed out. The message will remain in the bus queue; you will need
642 * to gst_bus_pop() it off before entering gst_bus_poll() again.
645 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
647 GstBusPollData *poll_data;
651 poll_data = g_new0 (GstBusPollData, 1);
652 g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN);
654 poll_data->source_running = TRUE;
655 poll_data->loop = g_main_loop_new (NULL, FALSE);
656 poll_data->events = events;
657 poll_data->revent = GST_MESSAGE_UNKNOWN;
660 poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
661 timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
662 (GDestroyNotify) poll_destroy_timeout);
664 poll_data->timeout_id = 0;
666 id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE,
667 (GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy);
668 g_main_loop_run (poll_data->loop);
669 ret = poll_data->revent;
671 if (poll_data->timeout_id)
672 g_source_remove (poll_data->timeout_id);
674 /* poll_data may get destroyed at any time now */
675 g_source_remove (id);
677 GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret);