2 * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
4 * gstbus.c: GstBus subsystem
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 02111-1307, USA.
25 #include <sys/types.h>
26 #include <sys/socket.h>
28 #include "gst_private.h"
38 static void gst_bus_class_init (GstBusClass * klass);
39 static void gst_bus_init (GstBus * bus);
40 static void gst_bus_dispose (GObject * object);
42 static void gst_bus_set_property (GObject * object, guint prop_id,
43 const GValue * value, GParamSpec * pspec);
44 static void gst_bus_get_property (GObject * object, guint prop_id,
45 GValue * value, GParamSpec * pspec);
47 static GstObjectClass *parent_class = NULL;
49 /* static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; */
52 gst_bus_get_type (void)
54 static GType bus_type = 0;
57 static const GTypeInfo bus_info = {
61 (GClassInitFunc) gst_bus_class_init,
66 (GInstanceInitFunc) gst_bus_init,
70 bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
76 gst_bus_class_init (GstBusClass * klass)
78 GObjectClass *gobject_class;
79 GstObjectClass *gstobject_class;
81 gobject_class = (GObjectClass *) klass;
82 gstobject_class = (GstObjectClass *) klass;
84 parent_class = g_type_class_ref (GST_TYPE_OBJECT);
86 if (!g_thread_supported ())
89 gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
90 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
91 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
95 gst_bus_init (GstBus * bus)
97 bus->queue = g_queue_new ();
98 bus->queue_lock = g_mutex_new ();
104 gst_bus_dispose (GObject * object)
108 bus = GST_BUS (object);
111 g_mutex_lock (bus->queue_lock);
112 g_queue_free (bus->queue);
114 g_mutex_unlock (bus->queue_lock);
115 g_mutex_free (bus->queue_lock);
116 bus->queue_lock = NULL;
119 G_OBJECT_CLASS (parent_class)->dispose (object);
123 gst_bus_set_property (GObject * object, guint prop_id,
124 const GValue * value, GParamSpec * pspec)
128 bus = GST_BUS (object);
132 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
138 gst_bus_get_property (GObject * object, guint prop_id,
139 GValue * value, GParamSpec * pspec)
143 bus = GST_BUS (object);
147 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
157 result = g_object_new (gst_bus_get_type (), NULL);
164 * @bus: a #GstBus to post on
165 * @message: The #GstMessage to post
167 * Post a message on the given bus.
169 * Returns: TRUE if the message could be posted.
174 gst_bus_post (GstBus * bus, GstMessage * message)
176 GstBusSyncReply reply = GST_BUS_PASS;
177 GstBusSyncHandler handler;
178 gpointer handler_data;
180 g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
181 g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
183 GST_DEBUG_OBJECT (bus, "posting message on bus, type %d",
184 GST_MESSAGE_TYPE (message));
187 if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING)) {
188 gst_message_unref (message);
193 handler = bus->sync_handler;
194 handler_data = bus->sync_handler_data;
198 /* first call the sync handler if it is installed */
200 reply = handler (bus, message, handler_data);
203 /* now see what we should do with the message */
206 /* drop the message */
209 /* pass the message to the async queue */
210 g_mutex_lock (bus->queue_lock);
211 g_queue_push_tail (bus->queue, message);
212 g_mutex_unlock (bus->queue_lock);
214 /* FIXME cannot assume the source is only in the default context */
215 g_main_context_wakeup (NULL);
220 /* async delivery, we need a mutex and a cond to block
222 GMutex *lock = g_mutex_new ();
223 GCond *cond = g_cond_new ();
225 GST_MESSAGE_COND (message) = cond;
226 GST_MESSAGE_GET_LOCK (message) = lock;
228 GST_DEBUG ("waiting for async delivery of message %p", message);
230 /* now we lock the message mutex, send the message to the async
231 * queue. When the message is handled by the app and destroyed,
232 * the cond will be signalled and we can continue */
234 g_mutex_lock (bus->queue_lock);
235 g_queue_push_tail (bus->queue, message);
236 g_mutex_unlock (bus->queue_lock);
238 /* FIXME cannot assume the source is only in the default context */
239 g_main_context_wakeup (NULL);
241 /* now block till the message is freed */
242 g_cond_wait (cond, lock);
243 g_mutex_unlock (lock);
245 GST_DEBUG ("message %p delivered asynchronously", message);
257 * gst_bus_have_pending:
258 * @bus: a #GstBus to check
260 * Check if there are pending messages on the bus that should be
263 * Returns: TRUE if there are messages on the bus to be handled.
268 gst_bus_have_pending (GstBus * bus)
272 g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
274 g_mutex_lock (bus->queue_lock);
275 length = g_queue_get_length (bus->queue);
276 g_mutex_unlock (bus->queue_lock);
282 * gst_bus_set_flushing:
284 * @flushing: whether or not to flush the bus
286 * If @flushing, flush out and unref any messages queued in the bus. Releases
287 * references to the message origin objects. Will flush future messages until
288 * gst_bus_set_flushing() sets @flushing to #FALSE.
293 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
300 GST_FLAG_SET (bus, GST_BUS_FLUSHING);
302 while ((message = gst_bus_pop (bus)))
303 gst_message_unref (message);
305 GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
313 * @bus: a #GstBus to pop
315 * Get a message from the bus.
317 * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
322 gst_bus_pop (GstBus * bus)
326 g_return_val_if_fail (GST_IS_BUS (bus), NULL);
328 g_mutex_lock (bus->queue_lock);
329 message = g_queue_pop_head (bus->queue);
330 g_mutex_unlock (bus->queue_lock);
339 * Peek the message on the top of the bus' queue. The bus maintains ownership of
340 * the message, and the message will remain on the bus' message queue.
342 * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
347 gst_bus_peek (GstBus * bus)
351 g_return_val_if_fail (GST_IS_BUS (bus), NULL);
353 g_mutex_lock (bus->queue_lock);
354 message = g_queue_peek_head (bus->queue);
355 g_mutex_unlock (bus->queue_lock);
361 * gst_bus_set_sync_handler:
362 * @bus: a #GstBus to install the handler on
363 * @func: The handler function to install
364 * @data: User data that will be sent to the handler function.
366 * Install a synchronous handler on the bus. The function will be called
367 * every time a new message is posted on the bus. Note that the function
368 * will be called in the same thread context as the posting object.
371 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
373 g_return_if_fail (GST_IS_BUS (bus));
376 bus->sync_handler = func;
377 bus->sync_handler_data = data;
381 /* GSource for the bus
390 gst_bus_source_prepare (GSource * source, gint * timeout)
393 return gst_bus_have_pending (((GstBusSource *) source)->bus);
397 gst_bus_source_check (GSource * source)
399 return gst_bus_have_pending (((GstBusSource *) source)->bus);
403 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
406 GstBusHandler handler = (GstBusHandler) callback;
407 GstBusSource *bsource = (GstBusSource *) source;
409 gboolean needs_pop = TRUE;
411 g_return_val_if_fail (GST_IS_BUS (bsource->bus), FALSE);
413 message = gst_bus_peek (bsource->bus);
415 g_return_val_if_fail (message != NULL, TRUE);
418 g_warning ("GstBus watch dispatched without callback\n"
419 "You must call g_source_connect().");
423 needs_pop = handler (bsource->bus, message, user_data);
426 gst_message_unref (gst_bus_pop (bsource->bus));
432 gst_bus_source_finalize (GSource * source)
434 GstBusSource *bsource = (GstBusSource *) source;
436 gst_object_unref (GST_OBJECT_CAST (bsource->bus));
440 static GSourceFuncs gst_bus_source_funcs = {
441 gst_bus_source_prepare,
442 gst_bus_source_check,
443 gst_bus_source_dispatch,
444 gst_bus_source_finalize
448 * gst_bus_create_watch:
449 * @bus: a #GstBus to create the watch for
451 * Create watch for this bus.
453 * Returns: A #GSource that can be added to a mainloop.
456 gst_bus_create_watch (GstBus * bus)
458 GstBusSource *source;
460 g_return_val_if_fail (GST_IS_BUS (bus), NULL);
462 source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
463 sizeof (GstBusSource));
464 gst_object_ref (GST_OBJECT_CAST (bus));
467 return (GSource *) source;
471 * gst_bus_add_watch_full:
472 * @bus: a #GstBus to create the watch for.
473 * @priority: The priority of the watch.
474 * @handler: A function to call when a message is received.
475 * @user_data: user data passed to @handler.
476 * @notify: the function to call when the source is removed.
478 * Adds the bus to the mainloop with the given priority. If the handler returns
479 * TRUE, the message will then be popped off the queue. When the handler is
480 * called, the message belongs to the caller; if you want to keep a copy of it,
481 * call gst_message_ref before leaving the handler.
483 * Returns: The event source id.
488 gst_bus_add_watch_full (GstBus * bus, gint priority,
489 GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
494 g_return_val_if_fail (GST_IS_BUS (bus), 0);
496 source = gst_bus_create_watch (bus);
498 if (priority != G_PRIORITY_DEFAULT)
499 g_source_set_priority (source, priority);
501 g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
503 id = g_source_attach (source, NULL);
504 g_source_unref (source);
511 * @bus: a #GstBus to create the watch for
512 * @handler: A function to call when a message is received.
513 * @user_data: user data passed to @handler.
515 * Adds the bus to the mainloop with the default priority.
517 * Returns: The event source id.
522 gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data)
524 return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data,
532 GstMessageType events;
533 GstMessageType revent;
537 poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
539 if (GST_MESSAGE_TYPE (message) & poll_data->events) {
540 poll_data->revent = GST_MESSAGE_TYPE (message);
541 if (g_main_loop_is_running (poll_data->loop))
542 g_main_loop_quit (poll_data->loop);
543 /* keep the message on the queue */
546 /* pop and unref the message */
552 poll_timeout (GstBusPollData * poll_data)
554 poll_data->timeout_id = 0;
555 g_main_loop_quit (poll_data->loop);
556 /* returning FALSE will remove the source id */
563 * @events: a mask of #GstMessageType, representing the set of message types to
565 * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
567 * Poll the bus for events. Will block while waiting for events to come. You can
568 * specify a maximum time to poll with the @timeout parameter. If @timeout is
569 * negative, this function will block indefinitely.
571 * Returns: The type of the message that was received, or GST_MESSAGE_UNKNOWN if
572 * the poll timed out. The message will remain in the bus queue; you will need
573 * to gst_bus_pop() it off before entering gst_bus_poll() again.
576 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
578 GstBusPollData *poll_data;
582 poll_data = g_new0 (GstBusPollData, 1);
584 poll_data->timeout_id = g_timeout_add (timeout / GST_MSECOND,
585 (GSourceFunc) poll_timeout, poll_data);
586 poll_data->loop = g_main_loop_new (NULL, FALSE);
587 poll_data->events = events;
588 poll_data->revent = GST_MESSAGE_UNKNOWN;
590 id = gst_bus_add_watch (bus, (GstBusHandler) poll_handler, poll_data);
591 g_main_loop_run (poll_data->loop);
592 g_source_remove (id);
594 ret = poll_data->revent;
596 if (poll_data->timeout_id)
597 g_source_remove (poll_data->timeout_id);
598 g_main_loop_unref (poll_data->loop);