6e1618cd248435131c34eb03c7e9407b22e71cc8
[platform/upstream/gstreamer.git] / gst / gstbus.c
1 /* GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbus.c: GstBus subsystem
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21 /**
22  * SECTION:gstbus
23  * @short_description: Asynchronous message bus subsystem
24  * @see_also: #GstMessage, #GstElement
25  *
26  * The #GstBus is an object responsible for delivering #GstMessages in
27  * a first-in first-out way from the streaming threads to the application.
28  * 
29  * Since the application typically only wants to deal with delivery of these
30  * messages from one thread, the GstBus will marshall the messages between
31  * different threads. This is important since the actual streaming of media
32  * is done in another thread than the application.
33  * 
34  * The GstBus provides support for #GSource based notifications. This makes it
35  * possible to handle the delivery in the glib mainloop.
36  * 
37  * A message is posted on the bus with the gst_bus_post() method. With the
38  * gst_bus_peek() and _pop() methods one can look at or retrieve a previously
39  * posted message.
40  * 
41  * The bus can be polled with the gst_bus_poll() method. This methods blocks
42  * up to the specified timeout value until one of the specified messages types
43  * is posted on the bus. The application can then _pop() the messages from the
44  * bus to handle them.
45  * Alternatively the application can register an asynchronous bus function using
46  * gst_bus_add_watch_full() or gst_bus_add_watch(). This function will receive
47  * messages a short while after they have been posted.
48  * 
49  * It is also possible to get messages from the bus without any thread 
50  * marshalling with the gst_bus_set_sync_handler() method. This makes it
51  * possible to react to a message in the same thread that posted the
52  * message on the bus. This should only be used if the application is able
53  * to deal with messages from different threads.
54  *
55  * Every #GstBin has one bus.
56  *
57  * Note that a #GstBin will set its bus into flushing state when changing from
58  * READY to NULL state.
59  */
60
61 #include <errno.h>
62 #include <unistd.h>
63 #include <sys/types.h>
64 #include <sys/socket.h>
65
66 #include "gst_private.h"
67 #include "gstinfo.h"
68
69 #include "gstbus.h"
70
71 /* bus signals */
72 enum
73 {
74   SYNC_MESSAGE,
75   ASYNC_MESSAGE,
76   /* add more above */
77   LAST_SIGNAL
78 };
79
80 static void gst_bus_class_init (GstBusClass * klass);
81 static void gst_bus_init (GstBus * bus);
82 static void gst_bus_dispose (GObject * object);
83
84 static void gst_bus_set_property (GObject * object, guint prop_id,
85     const GValue * value, GParamSpec * pspec);
86 static void gst_bus_get_property (GObject * object, guint prop_id,
87     GValue * value, GParamSpec * pspec);
88
89 static GstObjectClass *parent_class = NULL;
90 static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
91
92 GType
93 gst_bus_get_type (void)
94 {
95   static GType bus_type = 0;
96
97   if (!bus_type) {
98     static const GTypeInfo bus_info = {
99       sizeof (GstBusClass),
100       NULL,
101       NULL,
102       (GClassInitFunc) gst_bus_class_init,
103       NULL,
104       NULL,
105       sizeof (GstBus),
106       0,
107       (GInstanceInitFunc) gst_bus_init,
108       NULL
109     };
110
111     bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
112   }
113   return bus_type;
114 }
115
116 /* fixme: do something about this */
117 static void
118 marshal_VOID__MINIOBJECT (GClosure * closure, GValue * return_value,
119     guint n_param_values, const GValue * param_values, gpointer invocation_hint,
120     gpointer marshal_data)
121 {
122   typedef void (*marshalfunc_VOID__MINIOBJECT) (gpointer obj, gpointer arg1,
123       gpointer data2);
124   register marshalfunc_VOID__MINIOBJECT callback;
125   register GCClosure *cc = (GCClosure *) closure;
126   register gpointer data1, data2;
127
128   g_return_if_fail (n_param_values == 2);
129
130   if (G_CCLOSURE_SWAP_DATA (closure)) {
131     data1 = closure->data;
132     data2 = g_value_peek_pointer (param_values + 0);
133   } else {
134     data1 = g_value_peek_pointer (param_values + 0);
135     data2 = closure->data;
136   }
137   callback =
138       (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data : cc->
139       callback);
140
141   callback (data1, gst_value_get_mini_object (param_values + 1), data2);
142 }
143
144 static void
145 gst_bus_class_init (GstBusClass * klass)
146 {
147   GObjectClass *gobject_class;
148   GstObjectClass *gstobject_class;
149
150   gobject_class = (GObjectClass *) klass;
151   gstobject_class = (GstObjectClass *) klass;
152
153   parent_class = g_type_class_peek_parent (klass);
154
155   if (!g_thread_supported ())
156     g_thread_init (NULL);
157
158   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
159   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
160   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
161
162   /**
163    * GstBus::sync-message:
164    * @bus: the object which received the signal
165    * @message: the message that has been posted synchronously
166    *
167    * A message has been posted on the bus. This signal is emited from the
168    * thread that posted the message so one has to be carefull with locking.
169    */
170   gst_bus_signals[SYNC_MESSAGE] =
171       g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
172       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
173       G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
174       marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
175
176   /**
177    * GstBus::message:
178    * @bus: the object which received the signal
179    * @message: the message that has been posted asynchronously
180    *
181    * A message has been posted on the bus. This signal is emited from a
182    * GSource added to the mainloop.
183    */
184   gst_bus_signals[ASYNC_MESSAGE] =
185       g_signal_new ("message", G_TYPE_FROM_CLASS (klass),
186       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
187       G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
188       marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
189 }
190
191 static void
192 gst_bus_init (GstBus * bus)
193 {
194   bus->queue = g_queue_new ();
195   bus->queue_lock = g_mutex_new ();
196
197   GST_DEBUG_OBJECT (bus, "created");
198
199   return;
200 }
201
202 static void
203 gst_bus_dispose (GObject * object)
204 {
205   GstBus *bus;
206
207   bus = GST_BUS (object);
208
209   if (bus->queue) {
210     GstMessage *message;
211
212     g_mutex_lock (bus->queue_lock);
213     do {
214       message = g_queue_pop_head (bus->queue);
215       if (message)
216         gst_message_unref (message);
217     } while (message != NULL);
218     g_queue_free (bus->queue);
219     bus->queue = NULL;
220     g_mutex_unlock (bus->queue_lock);
221     g_mutex_free (bus->queue_lock);
222     bus->queue_lock = NULL;
223   }
224
225   G_OBJECT_CLASS (parent_class)->dispose (object);
226 }
227
228 static void
229 gst_bus_set_property (GObject * object, guint prop_id,
230     const GValue * value, GParamSpec * pspec)
231 {
232   GstBus *bus;
233
234   bus = GST_BUS (object);
235
236   switch (prop_id) {
237     default:
238       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
239       break;
240   }
241 }
242
243 static void
244 gst_bus_get_property (GObject * object, guint prop_id,
245     GValue * value, GParamSpec * pspec)
246 {
247   GstBus *bus;
248
249   bus = GST_BUS (object);
250
251   switch (prop_id) {
252     default:
253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254       break;
255   }
256 }
257
258 /**
259  * gst_bus_new:
260  *
261  * Creates a new #GstBuus instance.
262  *
263  * Returns: a new #GstBus instance
264  */
265 GstBus *
266 gst_bus_new (void)
267 {
268   GstBus *result;
269
270   result = g_object_new (gst_bus_get_type (), NULL);
271
272   return result;
273 }
274
275 /**
276  * gst_bus_post:
277  * @bus: a #GstBus to post on
278  * @message: The #GstMessage to post
279  *
280  * Post a message on the given bus. Ownership of the message
281  * is taken by the bus.
282  *
283  * Returns: TRUE if the message could be posted.
284  *
285  * MT safe.
286  */
287 gboolean
288 gst_bus_post (GstBus * bus, GstMessage * message)
289 {
290   GstBusSyncReply reply = GST_BUS_PASS;
291   GstBusSyncHandler handler;
292   gpointer handler_data;
293
294   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
295   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
296
297   GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %s",
298       message, gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
299
300   GST_LOCK (bus);
301   /* check if the bus is flushing */
302   if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
303     goto is_flushing;
304
305   handler = bus->sync_handler;
306   handler_data = bus->sync_handler_data;
307   GST_UNLOCK (bus);
308
309   /* first call the sync handler if it is installed */
310   if (handler)
311     reply = handler (bus, message, handler_data);
312
313   /* now see what we should do with the message */
314   switch (reply) {
315     case GST_BUS_DROP:
316       /* drop the message */
317       GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message);
318       break;
319     case GST_BUS_PASS:
320       /* pass the message to the async queue, refcount passed in the queue */
321       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
322       g_mutex_lock (bus->queue_lock);
323       g_queue_push_tail (bus->queue, message);
324       g_mutex_unlock (bus->queue_lock);
325       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
326
327       /* FIXME cannot assume the source is only in the default context */
328       g_main_context_wakeup (NULL);
329
330       break;
331     case GST_BUS_ASYNC:
332     {
333       /* async delivery, we need a mutex and a cond to block
334        * on */
335       GMutex *lock = g_mutex_new ();
336       GCond *cond = g_cond_new ();
337
338       GST_MESSAGE_COND (message) = cond;
339       GST_MESSAGE_GET_LOCK (message) = lock;
340
341       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
342
343       /* now we lock the message mutex, send the message to the async
344        * queue. When the message is handled by the app and destroyed,
345        * the cond will be signalled and we can continue */
346       g_mutex_lock (lock);
347       g_mutex_lock (bus->queue_lock);
348       g_queue_push_tail (bus->queue, message);
349       g_mutex_unlock (bus->queue_lock);
350
351       /* FIXME cannot assume the source is only in the default context */
352       g_main_context_wakeup (NULL);
353
354       /* now block till the message is freed */
355       g_cond_wait (cond, lock);
356       g_mutex_unlock (lock);
357
358       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
359
360       g_mutex_free (lock);
361       g_cond_free (cond);
362       break;
363     }
364     default:
365       g_warning ("invalid return from bus sync handler");
366       break;
367   }
368   return TRUE;
369
370   /* ERRORS */
371 is_flushing:
372   {
373     GST_DEBUG_OBJECT (bus, "bus is flushing");
374     gst_message_unref (message);
375     GST_UNLOCK (bus);
376
377     return FALSE;
378   }
379 }
380
381 /**
382  * gst_bus_have_pending:
383  * @bus: a #GstBus to check
384  *
385  * Check if there are pending messages on the bus that
386  * should be handled.
387  *
388  * Returns: TRUE if there are messages on the bus to be handled.
389  *
390  * MT safe.
391  */
392 gboolean
393 gst_bus_have_pending (GstBus * bus)
394 {
395   gboolean result;
396
397   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
398
399   g_mutex_lock (bus->queue_lock);
400   /* see if there is a message on the bus */
401   result = !g_queue_is_empty (bus->queue);
402   g_mutex_unlock (bus->queue_lock);
403
404   return result;
405 }
406
407 /**
408  * gst_bus_set_flushing:
409  * @bus: a #GstBus
410  * @flushing: whether or not to flush the bus
411  *
412  * If @flushing, flush out and unref any messages queued in the bus. Releases
413  * references to the message origin objects. Will flush future messages until
414  * gst_bus_set_flushing() sets @flushing to #FALSE.
415  *
416  * MT safe.
417  */
418 void
419 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
420 {
421   GstMessage *message;
422
423   GST_LOCK (bus);
424
425   if (flushing) {
426     GST_FLAG_SET (bus, GST_BUS_FLUSHING);
427
428     GST_DEBUG_OBJECT (bus, "set bus flushing");
429
430     while ((message = gst_bus_pop (bus)))
431       gst_message_unref (message);
432   } else {
433     GST_DEBUG_OBJECT (bus, "unset bus flushing");
434     GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
435   }
436
437   GST_UNLOCK (bus);
438 }
439
440
441 /**
442  * gst_bus_pop:
443  * @bus: a #GstBus to pop
444  *
445  * Get a message from the bus.
446  *
447  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
448  *
449  * MT safe.
450  */
451 GstMessage *
452 gst_bus_pop (GstBus * bus)
453 {
454   GstMessage *message;
455
456   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
457
458   GST_DEBUG_OBJECT (bus, "%d messages on the bus",
459       g_queue_get_length (bus->queue));
460
461   g_mutex_lock (bus->queue_lock);
462   message = g_queue_pop_head (bus->queue);
463   g_mutex_unlock (bus->queue_lock);
464
465   if (message)
466     GST_DEBUG_OBJECT (bus, "pop on bus, got message %p, %s", message,
467         gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
468   else
469     GST_DEBUG_OBJECT (bus, "pop on bus, no message");
470
471   return message;
472 }
473
474 /**
475  * gst_bus_peek:
476  * @bus: a #GstBus
477  *
478  * Peek the message on the top of the bus' queue. The message will remain 
479  * on the bus' message queue. A reference is returned, and needs to be unreffed
480  * by the caller.
481  *
482  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
483  *
484  * MT safe.
485  */
486 GstMessage *
487 gst_bus_peek (GstBus * bus)
488 {
489   GstMessage *message;
490
491   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
492
493   g_mutex_lock (bus->queue_lock);
494   message = g_queue_peek_head (bus->queue);
495   if (message)
496     gst_message_ref (message);
497   g_mutex_unlock (bus->queue_lock);
498
499   GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
500
501   return message;
502 }
503
504 /**
505  * gst_bus_set_sync_handler:
506  * @bus: a #GstBus to install the handler on
507  * @func: The handler function to install
508  * @data: User data that will be sent to the handler function.
509  *
510  * Sets the synchronous handler on the bus. The function will be called
511  * every time a new message is posted on the bus. Note that the function
512  * will be called in the same thread context as the posting object. This
513  * function is usually only called by the creator of the bus. Applications
514  * should handle messages asynchronously using the gst_bus watch and poll
515  * functions.
516  */
517 void
518 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
519 {
520   g_return_if_fail (GST_IS_BUS (bus));
521
522   GST_LOCK (bus);
523
524   /* Assert if the user attempts to replace an existing sync_handler,
525    * other than to clear it */
526   g_assert (func == NULL || bus->sync_handler == NULL);
527
528   bus->sync_handler = func;
529   bus->sync_handler_data = data;
530   GST_UNLOCK (bus);
531 }
532
533 /* GSource for the bus
534  */
535 typedef struct
536 {
537   GSource source;
538   GstBus *bus;
539 } GstBusSource;
540
541 static gboolean
542 gst_bus_source_prepare (GSource * source, gint * timeout)
543 {
544   GstBusSource *bsrc = (GstBusSource *) source;
545
546   *timeout = -1;
547   return gst_bus_have_pending (bsrc->bus);
548 }
549
550 static gboolean
551 gst_bus_source_check (GSource * source)
552 {
553   GstBusSource *bsrc = (GstBusSource *) source;
554
555   return gst_bus_have_pending (bsrc->bus);
556 }
557
558 static gboolean
559 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
560     gpointer user_data)
561 {
562   GstBusFunc handler = (GstBusFunc) callback;
563   GstBusSource *bsource = (GstBusSource *) source;
564   GstMessage *message;
565   gboolean keep;
566   GstBus *bus;
567
568   g_return_val_if_fail (bsource != NULL, FALSE);
569
570   bus = bsource->bus;
571
572   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
573
574   message = gst_bus_pop (bus);
575   g_return_val_if_fail (message != NULL, FALSE);
576
577   if (!handler)
578     goto no_handler;
579
580   GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
581
582   keep = handler (bus, message, user_data);
583   gst_message_unref (message);
584
585   GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep);
586
587   return keep;
588
589 no_handler:
590   {
591     g_warning ("GstBus watch dispatched without callback\n"
592         "You must call g_source_connect().");
593     gst_message_unref (message);
594     return FALSE;
595   }
596 }
597
598 static void
599 gst_bus_source_finalize (GSource * source)
600 {
601   GstBusSource *bsource = (GstBusSource *) source;
602
603   gst_object_unref (bsource->bus);
604   bsource->bus = NULL;
605 }
606
607 static GSourceFuncs gst_bus_source_funcs = {
608   gst_bus_source_prepare,
609   gst_bus_source_check,
610   gst_bus_source_dispatch,
611   gst_bus_source_finalize
612 };
613
614 /**
615  * gst_bus_create_watch:
616  * @bus: a #GstBus to create the watch for
617  *
618  * Create watch for this bus. The GSource will be dispatched whenever
619  * a message is on the bus. After the GSource is dispatched, the 
620  * message is popped off the bus and unreffed.
621  *
622  * Returns: A #GSource that can be added to a mainloop.
623  */
624 GSource *
625 gst_bus_create_watch (GstBus * bus)
626 {
627   GstBusSource *source;
628
629   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
630
631   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
632       sizeof (GstBusSource));
633   gst_object_ref (bus);
634   source->bus = bus;
635
636   return (GSource *) source;
637 }
638
639 /**
640  * gst_bus_add_watch_full:
641  * @bus: a #GstBus to create the watch for.
642  * @priority: The priority of the watch.
643  * @func: A function to call when a message is received.
644  * @user_data: user data passed to @func.
645  * @notify: the function to call when the source is removed.
646  *
647  * Adds a bus watch to the default main context with the given priority. 
648  * If the func returns FALSE, the source will be removed. 
649  *
650  * When the func is called, the message belongs to the caller; if you want to 
651  * keep a copy of it, call gst_message_ref() before leaving the func.
652  *
653  * The watch can be removed using #g_source_remove().
654  *
655  * Returns: The event source id.
656  *
657  * MT safe.
658  */
659 guint
660 gst_bus_add_watch_full (GstBus * bus, gint priority,
661     GstBusFunc func, gpointer user_data, GDestroyNotify notify)
662 {
663   guint id;
664   GSource *source;
665
666   g_return_val_if_fail (GST_IS_BUS (bus), 0);
667
668   source = gst_bus_create_watch (bus);
669
670   if (priority != G_PRIORITY_DEFAULT)
671     g_source_set_priority (source, priority);
672
673   g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
674
675   id = g_source_attach (source, NULL);
676   g_source_unref (source);
677
678   GST_DEBUG_OBJECT (bus, "New source %p", source);
679   return id;
680 }
681
682 /**
683  * gst_bus_add_watch:
684  * @bus: a #GstBus to create the watch for
685  * @func: A function to call when a message is received.
686  * @user_data: user data passed to @func.
687  *
688  * Adds a bus watch to the default main context with the default priority.
689  *
690  * The watch can be removed using #g_source_remove().
691  *
692  * Returns: The event source id.
693  *
694  * MT safe.
695  */
696 guint
697 gst_bus_add_watch (GstBus * bus, GstBusFunc func, gpointer user_data)
698 {
699   return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, func,
700       user_data, NULL);
701 }
702
703 typedef struct
704 {
705   GMainLoop *loop;
706   guint timeout_id;
707   gboolean source_running;
708   GstMessageType events;
709   GstMessage *message;
710 } GstBusPollData;
711
712 static gboolean
713 poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
714 {
715   if (!g_main_loop_is_running (poll_data->loop)) {
716     GST_DEBUG ("mainloop %p not running", poll_data->loop);
717     return TRUE;
718   }
719
720   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
721     g_return_val_if_fail (poll_data->message == NULL, FALSE);
722     /* keep ref to message */
723     poll_data->message = gst_message_ref (message);
724     GST_DEBUG ("mainloop %p quit", poll_data->loop);
725     g_main_loop_quit (poll_data->loop);
726   }
727
728   /* we always keep the source alive so that we don't accidentialy
729    * free the poll_data */
730   return TRUE;
731 }
732
733 static gboolean
734 poll_timeout (GstBusPollData * poll_data)
735 {
736   GST_DEBUG ("mainloop %p quit", poll_data->loop);
737   g_main_loop_quit (poll_data->loop);
738
739   /* we don't remove the GSource as this would free our poll_data,
740    * which we still need */
741   return TRUE;
742 }
743
744 static void
745 poll_destroy (GstBusPollData * poll_data)
746 {
747   poll_data->source_running = FALSE;
748   if (!poll_data->timeout_id) {
749     g_main_loop_unref (poll_data->loop);
750     g_free (poll_data);
751   }
752 }
753
754 static void
755 poll_destroy_timeout (GstBusPollData * poll_data)
756 {
757   poll_data->timeout_id = 0;
758   if (!poll_data->source_running) {
759     g_main_loop_unref (poll_data->loop);
760     g_free (poll_data);
761   }
762 }
763
764 /**
765  * gst_bus_poll:
766  * @bus: a #GstBus
767  * @events: a mask of #GstMessageType, representing the set of message types to
768  * poll for.
769  * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
770  *
771  * Poll the bus for events. Will block while waiting for events to come. You can
772  * specify a maximum time to poll with the @timeout parameter. If @timeout is
773  * negative, this function will block indefinitely.
774  *
775  * All messages not in @events will be popped off the bus and will be ignored.
776  *
777  * This function will enter the default mainloop while polling.
778  *
779  * Returns: The message that was received, or NULL if the poll timed out. 
780  * The message is taken from the bus and needs to be unreffed after usage.
781  */
782 GstMessage *
783 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
784 {
785   GstBusPollData *poll_data;
786   GstMessage *ret;
787   guint id;
788
789   poll_data = g_new0 (GstBusPollData, 1);
790   poll_data->source_running = TRUE;
791   poll_data->loop = g_main_loop_new (NULL, FALSE);
792   poll_data->events = events;
793   poll_data->message = NULL;
794
795   if (timeout >= 0)
796     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
797         timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
798         (GDestroyNotify) poll_destroy_timeout);
799   else
800     poll_data->timeout_id = 0;
801
802   id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT,
803       (GstBusFunc) poll_func, poll_data, (GDestroyNotify) poll_destroy);
804
805   GST_DEBUG ("running mainloop %p", poll_data->loop);
806   g_main_loop_run (poll_data->loop);
807   GST_DEBUG ("mainloop stopped %p", poll_data->loop);
808   /* holds a ref */
809   ret = poll_data->message;
810
811   if (poll_data->timeout_id)
812     g_source_remove (poll_data->timeout_id);
813
814   /* poll_data may get destroyed at any time now */
815   g_source_remove (id);
816
817   GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
818
819   return ret;
820 }
821
822 /**
823  * gst_bus_async_signal_func:
824  * @bus: a #GstBus
825  * @message: the message received
826  * @data: user data
827  *
828  * A helper GstBusFunc that can be used to convert all asynchronous messages 
829  * into signals.
830  *
831  * Returns: TRUE
832  */
833 gboolean
834 gst_bus_async_signal_func (GstBus * bus, GstMessage * message, gpointer data)
835 {
836   GQuark detail = 0;
837
838   g_return_val_if_fail (GST_IS_BUS (bus), TRUE);
839   g_return_val_if_fail (message != NULL, TRUE);
840
841   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
842
843   g_signal_emit (bus, gst_bus_signals[ASYNC_MESSAGE], detail, message);
844
845   /* we never remove this source based on signal emission return values */
846   return TRUE;
847 }
848
849 /**
850  * gst_bus_sync_signal_handler:
851  * @bus: a #GstBus
852  * @message: the message received
853  * @data: user data
854  *
855  * A helper GstBusSyncHandler that can be used to convert all synchronous
856  * messages into signals.
857  *
858  * Returns: GST_BUS_PASS
859  */
860 GstBusSyncReply
861 gst_bus_sync_signal_handler (GstBus * bus, GstMessage * message, gpointer data)
862 {
863   GQuark detail = 0;
864
865   g_return_val_if_fail (GST_IS_BUS (bus), GST_BUS_DROP);
866   g_return_val_if_fail (message != NULL, GST_BUS_DROP);
867
868   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
869
870   g_signal_emit (bus, gst_bus_signals[SYNC_MESSAGE], detail, message);
871
872   return GST_BUS_PASS;
873 }
874
875 /**
876  * gst_bus_add_signal_watch:
877  * @bus: a #GstBus on which you want to recieve the "message" signal
878  *
879  * Adds a bus signal watch to the default main context with the default priority.
880  * After calling this statement, the bus will emit the message signal for each
881  * message posted on the bus.
882  *
883  * This function may be called multiple times. To clean up, the caller is
884  * responsible for calling gst_bus_remove_signal_watch() as many times as this
885  * function is called.
886  *
887  * MT safe.
888  */
889 void
890 gst_bus_add_signal_watch (GstBus * bus)
891 {
892   g_return_if_fail (GST_IS_BUS (bus));
893
894   /* I know the callees don't take this lock, so go ahead and abuse it */
895   GST_LOCK (bus);
896
897   if (bus->num_signal_watchers > 0)
898     goto done;
899
900   g_assert (bus->signal_watch_id == 0);
901
902   bus->signal_watch_id =
903       gst_bus_add_watch (bus, gst_bus_async_signal_func, NULL);
904
905 done:
906
907   bus->num_signal_watchers++;
908
909   GST_UNLOCK (bus);
910 }
911
912 /**
913  * gst_bus_remove_signal_watch:
914  * @bus: a #GstBus you previously added a signal watch to
915  *
916  * Removes a signal watch previously added with gst_bus_add_signal_watch().
917  *
918  * MT safe.
919  */
920 void
921 gst_bus_remove_signal_watch (GstBus * bus)
922 {
923   g_return_if_fail (GST_IS_BUS (bus));
924
925   /* I know the callees don't take this lock, so go ahead and abuse it */
926   GST_LOCK (bus);
927
928   if (bus->num_signal_watchers == 0)
929     goto error;
930
931   bus->num_signal_watchers--;
932
933   if (bus->num_signal_watchers > 0)
934     goto done;
935
936   g_source_remove (bus->signal_watch_id);
937   bus->signal_watch_id = 0;
938
939 done:
940   GST_UNLOCK (bus);
941   return;
942
943 error:
944   {
945     g_critical ("Bus %s has no signal watches attached", GST_OBJECT_NAME (bus));
946     GST_UNLOCK (bus);
947     return;
948   }
949 }