check/gst/gstbin.c: Change for new bus API.
[platform/upstream/gstreamer.git] / gst / gstbus.c
1 /* GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbus.c: GstBus subsystem
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21 /**
22  * SECTION:gstbus
23  * @short_description: Asynchronous message bus subsystem
24  * @see_also: #GstMessage, #GstElement
25  *
26  * The #GstBus is an object responsible for delivering #GstMessages in
27  * a first-in first-out way from the streaming threads to the application.
28  * 
29  * Since the application typically only wants to deal with delivery of these
30  * messages from one thread, the GstBus will marshall the messages between
31  * different threads. This is important since the actual streaming of media
32  * is done in another thread than the application.
33  * 
34  * The GstBus provides support for #GSource based notifications. This makes it
35  * possible to handle the delivery in the glib mainloop.
36  * 
37  * A message is posted on the bus with the gst_bus_post() method. With the
38  * gst_bus_peek() and _pop() methods one can look at or retrieve a previously
39  * posted message.
40  * 
41  * The bus can be polled with the gst_bus_poll() method. This methods blocks
42  * up to the specified timeout value until one of the specified messages types
43  * is posted on the bus. The application can then _pop() the messages from the
44  * bus to handle them.
45  * Alternatively the application can register an asynchronous bus function using
46  * gst_bus_add_watch_full() or gst_bus_add_watch(). This function will receive
47  * messages a short while after they have been posted.
48  * 
49  * It is also possible to get messages from the bus without any thread 
50  * marshalling with the gst_bus_set_sync_handler() method. This makes it
51  * possible to react to a message in the same thread that posted the
52  * message on the bus. This should only be used if the application is able
53  * to deal with messages from different threads.
54  *
55  * Every #GstBin has one bus.
56  *
57  * Note that a #GstBin will set its bus into flushing state when changing from
58  * READY to NULL state.
59  */
60
61 #include <errno.h>
62 #include <unistd.h>
63 #include <sys/types.h>
64 #include <sys/socket.h>
65
66 #include "gst_private.h"
67 #include "gstinfo.h"
68
69 #include "gstbus.h"
70
71 /* bus signals */
72 enum
73 {
74   SYNC_MESSAGE,
75   ASYNC_MESSAGE,
76   /* add more above */
77   LAST_SIGNAL
78 };
79
80 static void gst_bus_class_init (GstBusClass * klass);
81 static void gst_bus_init (GstBus * bus);
82 static void gst_bus_dispose (GObject * object);
83
84 static void gst_bus_set_property (GObject * object, guint prop_id,
85     const GValue * value, GParamSpec * pspec);
86 static void gst_bus_get_property (GObject * object, guint prop_id,
87     GValue * value, GParamSpec * pspec);
88
89 static GstObjectClass *parent_class = NULL;
90 static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
91
92 GType
93 gst_bus_get_type (void)
94 {
95   static GType bus_type = 0;
96
97   if (!bus_type) {
98     static const GTypeInfo bus_info = {
99       sizeof (GstBusClass),
100       NULL,
101       NULL,
102       (GClassInitFunc) gst_bus_class_init,
103       NULL,
104       NULL,
105       sizeof (GstBus),
106       0,
107       (GInstanceInitFunc) gst_bus_init,
108       NULL
109     };
110
111     bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
112   }
113   return bus_type;
114 }
115
116 /* fixme: do something about this */
117 static void
118 marshal_VOID__MINIOBJECT (GClosure * closure, GValue * return_value,
119     guint n_param_values, const GValue * param_values, gpointer invocation_hint,
120     gpointer marshal_data)
121 {
122   typedef void (*marshalfunc_VOID__MINIOBJECT) (gpointer obj, gpointer arg1,
123       gpointer data2);
124   register marshalfunc_VOID__MINIOBJECT callback;
125   register GCClosure *cc = (GCClosure *) closure;
126   register gpointer data1, data2;
127
128   g_return_if_fail (n_param_values == 2);
129
130   if (G_CCLOSURE_SWAP_DATA (closure)) {
131     data1 = closure->data;
132     data2 = g_value_peek_pointer (param_values + 0);
133   } else {
134     data1 = g_value_peek_pointer (param_values + 0);
135     data2 = closure->data;
136   }
137   callback =
138       (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data : cc->
139       callback);
140
141   callback (data1, gst_value_get_mini_object (param_values + 1), data2);
142 }
143
144 static void
145 gst_bus_class_init (GstBusClass * klass)
146 {
147   GObjectClass *gobject_class;
148   GstObjectClass *gstobject_class;
149
150   gobject_class = (GObjectClass *) klass;
151   gstobject_class = (GstObjectClass *) klass;
152
153   parent_class = g_type_class_peek_parent (klass);
154
155   if (!g_thread_supported ())
156     g_thread_init (NULL);
157
158   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
159   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
160   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
161
162   /**
163    * GstBus::sync-message:
164    * @bus: the object which received the signal
165    * @message: the message that has been posted synchronously
166    *
167    * A message has been posted on the bus. This signal is emited from the
168    * thread that posted the message so one has to be carefull with locking.
169    */
170   gst_bus_signals[SYNC_MESSAGE] =
171       g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
172       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
173       G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
174       marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
175
176   /**
177    * GstBus::message:
178    * @bus: the object which received the signal
179    * @message: the message that has been posted asynchronously
180    *
181    * A message has been posted on the bus. This signal is emited from a
182    * GSource added to the mainloop.
183    */
184   gst_bus_signals[ASYNC_MESSAGE] =
185       g_signal_new ("message", G_TYPE_FROM_CLASS (klass),
186       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
187       G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
188       marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
189 }
190
191 static void
192 gst_bus_init (GstBus * bus)
193 {
194   bus->queue = g_queue_new ();
195   bus->queue_lock = g_mutex_new ();
196
197   GST_DEBUG_OBJECT (bus, "created");
198
199   return;
200 }
201
202 static void
203 gst_bus_dispose (GObject * object)
204 {
205   GstBus *bus;
206
207   bus = GST_BUS (object);
208
209   if (bus->queue) {
210     GstMessage *message;
211
212     g_mutex_lock (bus->queue_lock);
213     do {
214       message = g_queue_pop_head (bus->queue);
215       if (message)
216         gst_message_unref (message);
217     } while (message != NULL);
218     g_queue_free (bus->queue);
219     bus->queue = NULL;
220     g_mutex_unlock (bus->queue_lock);
221     g_mutex_free (bus->queue_lock);
222     bus->queue_lock = NULL;
223   }
224
225   G_OBJECT_CLASS (parent_class)->dispose (object);
226 }
227
228 static void
229 gst_bus_set_property (GObject * object, guint prop_id,
230     const GValue * value, GParamSpec * pspec)
231 {
232   GstBus *bus;
233
234   bus = GST_BUS (object);
235
236   switch (prop_id) {
237     default:
238       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
239       break;
240   }
241 }
242
243 static void
244 gst_bus_get_property (GObject * object, guint prop_id,
245     GValue * value, GParamSpec * pspec)
246 {
247   GstBus *bus;
248
249   bus = GST_BUS (object);
250
251   switch (prop_id) {
252     default:
253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254       break;
255   }
256 }
257
258 /**
259  * gst_bus_new:
260  *
261  * Creates a new #GstBuus instance.
262  *
263  * Returns: a new #GstBus instance
264  */
265 GstBus *
266 gst_bus_new (void)
267 {
268   GstBus *result;
269
270   result = g_object_new (gst_bus_get_type (), NULL);
271
272   return result;
273 }
274
275 /**
276  * gst_bus_post:
277  * @bus: a #GstBus to post on
278  * @message: The #GstMessage to post
279  *
280  * Post a message on the given bus. Ownership of the message
281  * is taken by the bus.
282  *
283  * Returns: TRUE if the message could be posted.
284  *
285  * MT safe.
286  */
287 gboolean
288 gst_bus_post (GstBus * bus, GstMessage * message)
289 {
290   GstBusSyncReply reply = GST_BUS_PASS;
291   GstBusSyncHandler handler;
292   gpointer handler_data;
293
294   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
295   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
296
297   GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %s",
298       message, gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
299
300   GST_LOCK (bus);
301   /* check if the bus is flushing */
302   if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
303     goto is_flushing;
304
305   handler = bus->sync_handler;
306   handler_data = bus->sync_handler_data;
307   GST_UNLOCK (bus);
308
309   /* first call the sync handler if it is installed */
310   if (handler)
311     reply = handler (bus, message, handler_data);
312
313   /* now see what we should do with the message */
314   switch (reply) {
315     case GST_BUS_DROP:
316       /* drop the message */
317       GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message);
318       break;
319     case GST_BUS_PASS:
320       /* pass the message to the async queue, refcount passed in the queue */
321       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
322       g_mutex_lock (bus->queue_lock);
323       g_queue_push_tail (bus->queue, message);
324       g_mutex_unlock (bus->queue_lock);
325       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
326
327       /* FIXME cannot assume the source is only in the default context */
328       g_main_context_wakeup (NULL);
329
330       break;
331     case GST_BUS_ASYNC:
332     {
333       /* async delivery, we need a mutex and a cond to block
334        * on */
335       GMutex *lock = g_mutex_new ();
336       GCond *cond = g_cond_new ();
337
338       GST_MESSAGE_COND (message) = cond;
339       GST_MESSAGE_GET_LOCK (message) = lock;
340
341       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
342
343       /* now we lock the message mutex, send the message to the async
344        * queue. When the message is handled by the app and destroyed,
345        * the cond will be signalled and we can continue */
346       g_mutex_lock (lock);
347       g_mutex_lock (bus->queue_lock);
348       g_queue_push_tail (bus->queue, message);
349       g_mutex_unlock (bus->queue_lock);
350
351       /* FIXME cannot assume the source is only in the default context */
352       g_main_context_wakeup (NULL);
353
354       /* now block till the message is freed */
355       g_cond_wait (cond, lock);
356       g_mutex_unlock (lock);
357
358       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
359
360       g_mutex_free (lock);
361       g_cond_free (cond);
362       break;
363     }
364     default:
365       g_warning ("invalid return from bus sync handler");
366       break;
367   }
368   return TRUE;
369
370   /* ERRORS */
371 is_flushing:
372   {
373     GST_DEBUG_OBJECT (bus, "bus is flushing");
374     gst_message_unref (message);
375     GST_UNLOCK (bus);
376
377     return FALSE;
378   }
379 }
380
381 /**
382  * gst_bus_have_pending:
383  * @bus: a #GstBus to check
384  *
385  * Check if there are pending messages on the bus that
386  * should be handled.
387  *
388  * Returns: TRUE if there are messages on the bus to be handled.
389  *
390  * MT safe.
391  */
392 gboolean
393 gst_bus_have_pending (GstBus * bus)
394 {
395   gboolean result;
396
397   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
398
399   g_mutex_lock (bus->queue_lock);
400   /* see if there is a message on the bus */
401   result = !g_queue_is_empty (bus->queue);
402   g_mutex_unlock (bus->queue_lock);
403
404   return result;
405 }
406
407 /**
408  * gst_bus_set_flushing:
409  * @bus: a #GstBus
410  * @flushing: whether or not to flush the bus
411  *
412  * If @flushing, flush out and unref any messages queued in the bus. Releases
413  * references to the message origin objects. Will flush future messages until
414  * gst_bus_set_flushing() sets @flushing to #FALSE.
415  *
416  * MT safe.
417  */
418 void
419 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
420 {
421   GstMessage *message;
422
423   GST_LOCK (bus);
424
425   if (flushing) {
426     GST_FLAG_SET (bus, GST_BUS_FLUSHING);
427
428     GST_DEBUG_OBJECT (bus, "set bus flushing");
429
430     while ((message = gst_bus_pop (bus)))
431       gst_message_unref (message);
432   } else {
433     GST_DEBUG_OBJECT (bus, "unset bus flushing");
434     GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
435   }
436
437   GST_UNLOCK (bus);
438 }
439
440
441 /**
442  * gst_bus_pop:
443  * @bus: a #GstBus to pop
444  *
445  * Get a message from the bus.
446  *
447  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
448  *
449  * MT safe.
450  */
451 GstMessage *
452 gst_bus_pop (GstBus * bus)
453 {
454   GstMessage *message;
455
456   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
457
458   g_mutex_lock (bus->queue_lock);
459   message = g_queue_pop_head (bus->queue);
460   g_mutex_unlock (bus->queue_lock);
461
462   GST_DEBUG_OBJECT (bus, "pop on bus, got message %p", message);
463
464   return message;
465 }
466
467 /**
468  * gst_bus_peek:
469  * @bus: a #GstBus
470  *
471  * Peek the message on the top of the bus' queue. The message will remain 
472  * on the bus' message queue. A reference is returned, and needs to be unreffed
473  * by the caller.
474  *
475  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
476  *
477  * MT safe.
478  */
479 GstMessage *
480 gst_bus_peek (GstBus * bus)
481 {
482   GstMessage *message;
483
484   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
485
486   g_mutex_lock (bus->queue_lock);
487   message = g_queue_peek_head (bus->queue);
488   if (message)
489     gst_message_ref (message);
490   g_mutex_unlock (bus->queue_lock);
491
492   GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
493
494   return message;
495 }
496
497 /**
498  * gst_bus_set_sync_handler:
499  * @bus: a #GstBus to install the handler on
500  * @func: The handler function to install
501  * @data: User data that will be sent to the handler function.
502  *
503  * Sets the synchronous handler on the bus. The function will be called
504  * every time a new message is posted on the bus. Note that the function
505  * will be called in the same thread context as the posting object. This
506  * function is usually only called by the creator of the bus. Applications
507  * should handle messages asynchronously using the gst_bus watch and poll
508  * functions.
509  */
510 void
511 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
512 {
513   g_return_if_fail (GST_IS_BUS (bus));
514
515   GST_LOCK (bus);
516
517   /* Assert if the user attempts to replace an existing sync_handler,
518    * other than to clear it */
519   g_assert (func == NULL || bus->sync_handler == NULL);
520
521   bus->sync_handler = func;
522   bus->sync_handler_data = data;
523   GST_UNLOCK (bus);
524 }
525
526 /* GSource for the bus
527  */
528 typedef struct
529 {
530   GSource source;
531   GstBus *bus;
532 } GstBusSource;
533
534 static gboolean
535 gst_bus_source_prepare (GSource * source, gint * timeout)
536 {
537   GstBusSource *bsrc = (GstBusSource *) source;
538
539   *timeout = -1;
540   return gst_bus_have_pending (bsrc->bus);
541 }
542
543 static gboolean
544 gst_bus_source_check (GSource * source)
545 {
546   GstBusSource *bsrc = (GstBusSource *) source;
547
548   return gst_bus_have_pending (bsrc->bus);
549 }
550
551 static gboolean
552 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
553     gpointer user_data)
554 {
555   GstBusFunc handler = (GstBusFunc) callback;
556   GstBusSource *bsource = (GstBusSource *) source;
557   GstMessage *message;
558   gboolean keep;
559   GstBus *bus;
560
561   g_return_val_if_fail (bsource != NULL, FALSE);
562
563   bus = bsource->bus;
564
565   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
566
567   message = gst_bus_pop (bus);
568   g_return_val_if_fail (message != NULL, FALSE);
569
570   if (!handler)
571     goto no_handler;
572
573   GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
574
575   keep = handler (bus, message, user_data);
576   gst_message_unref (message);
577
578   GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep);
579
580   return keep;
581
582 no_handler:
583   {
584     g_warning ("GstBus watch dispatched without callback\n"
585         "You must call g_source_connect().");
586     gst_message_unref (message);
587     return FALSE;
588   }
589 }
590
591 static void
592 gst_bus_source_finalize (GSource * source)
593 {
594   GstBusSource *bsource = (GstBusSource *) source;
595
596   gst_object_unref (bsource->bus);
597   bsource->bus = NULL;
598 }
599
600 static GSourceFuncs gst_bus_source_funcs = {
601   gst_bus_source_prepare,
602   gst_bus_source_check,
603   gst_bus_source_dispatch,
604   gst_bus_source_finalize
605 };
606
607 /**
608  * gst_bus_create_watch:
609  * @bus: a #GstBus to create the watch for
610  *
611  * Create watch for this bus. The GSource will be dispatched whenever
612  * a message is on the bus. After the GSource is dispatched, the 
613  * message is popped off the bus and unreffed.
614  *
615  * Returns: A #GSource that can be added to a mainloop.
616  */
617 GSource *
618 gst_bus_create_watch (GstBus * bus)
619 {
620   GstBusSource *source;
621
622   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
623
624   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
625       sizeof (GstBusSource));
626   gst_object_ref (bus);
627   source->bus = bus;
628
629   return (GSource *) source;
630 }
631
632 /**
633  * gst_bus_add_watch_full:
634  * @bus: a #GstBus to create the watch for.
635  * @priority: The priority of the watch.
636  * @func: A function to call when a message is received.
637  * @user_data: user data passed to @func.
638  * @notify: the function to call when the source is removed.
639  *
640  * Adds a bus watch to the default main context with the given priority. 
641  * If the func returns FALSE, the source will be removed. 
642  *
643  * When the func is called, the message belongs to the caller; if you want to 
644  * keep a copy of it, call gst_message_ref() before leaving the func.
645  *
646  * The watch can be removed using #g_source_remove().
647  *
648  * Returns: The event source id.
649  *
650  * MT safe.
651  */
652 guint
653 gst_bus_add_watch_full (GstBus * bus, gint priority,
654     GstBusFunc func, gpointer user_data, GDestroyNotify notify)
655 {
656   guint id;
657   GSource *source;
658
659   g_return_val_if_fail (GST_IS_BUS (bus), 0);
660
661   source = gst_bus_create_watch (bus);
662
663   if (priority != G_PRIORITY_DEFAULT)
664     g_source_set_priority (source, priority);
665
666   g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
667
668   id = g_source_attach (source, NULL);
669   g_source_unref (source);
670
671   GST_DEBUG_OBJECT (bus, "New source %p", source);
672   return id;
673 }
674
675 /**
676  * gst_bus_add_watch:
677  * @bus: a #GstBus to create the watch for
678  * @func: A function to call when a message is received.
679  * @user_data: user data passed to @func.
680  *
681  * Adds a bus watch to the default main context with the default priority.
682  *
683  * The watch can be removed using #g_source_remove().
684  *
685  * Returns: The event source id.
686  *
687  * MT safe.
688  */
689 guint
690 gst_bus_add_watch (GstBus * bus, GstBusFunc func, gpointer user_data)
691 {
692   return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, func,
693       user_data, NULL);
694 }
695
696 typedef struct
697 {
698   GMainLoop *loop;
699   guint timeout_id;
700   gboolean source_running;
701   GstMessageType events;
702   GstMessage *message;
703 } GstBusPollData;
704
705 static gboolean
706 poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
707 {
708   if (!g_main_loop_is_running (poll_data->loop)) {
709     GST_DEBUG ("mainloop %p not running", poll_data->loop);
710     return TRUE;
711   }
712
713   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
714     g_return_val_if_fail (poll_data->message == NULL, FALSE);
715     /* keep ref to message */
716     poll_data->message = gst_message_ref (message);
717     GST_DEBUG ("mainloop %p quit", poll_data->loop);
718     g_main_loop_quit (poll_data->loop);
719   }
720
721   /* we always keep the source alive so that we don't accidentialy
722    * free the poll_data */
723   return TRUE;
724 }
725
726 static gboolean
727 poll_timeout (GstBusPollData * poll_data)
728 {
729   GST_DEBUG ("mainloop %p quit", poll_data->loop);
730   g_main_loop_quit (poll_data->loop);
731
732   /* we don't remove the GSource as this would free our poll_data,
733    * which we still need */
734   return TRUE;
735 }
736
737 static void
738 poll_destroy (GstBusPollData * poll_data)
739 {
740   poll_data->source_running = FALSE;
741   if (!poll_data->timeout_id) {
742     g_main_loop_unref (poll_data->loop);
743     g_free (poll_data);
744   }
745 }
746
747 static void
748 poll_destroy_timeout (GstBusPollData * poll_data)
749 {
750   poll_data->timeout_id = 0;
751   if (!poll_data->source_running) {
752     g_main_loop_unref (poll_data->loop);
753     g_free (poll_data);
754   }
755 }
756
757 /**
758  * gst_bus_poll:
759  * @bus: a #GstBus
760  * @events: a mask of #GstMessageType, representing the set of message types to
761  * poll for.
762  * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
763  *
764  * Poll the bus for events. Will block while waiting for events to come. You can
765  * specify a maximum time to poll with the @timeout parameter. If @timeout is
766  * negative, this function will block indefinitely.
767  *
768  * All messages not in @events will be popped off the bus and will be ignored.
769  *
770  * This function will enter the default mainloop while polling.
771  *
772  * Returns: The message that was received, or NULL if the poll timed out. 
773  * The message is taken from the bus and needs to be unreffed after usage.
774  */
775 GstMessage *
776 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
777 {
778   GstBusPollData *poll_data;
779   GstMessage *ret;
780   guint id;
781
782   poll_data = g_new0 (GstBusPollData, 1);
783   poll_data->source_running = TRUE;
784   poll_data->loop = g_main_loop_new (NULL, FALSE);
785   poll_data->events = events;
786   poll_data->message = NULL;
787
788   if (timeout >= 0)
789     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
790         timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
791         (GDestroyNotify) poll_destroy_timeout);
792   else
793     poll_data->timeout_id = 0;
794
795   id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT,
796       (GstBusFunc) poll_func, poll_data, (GDestroyNotify) poll_destroy);
797
798   GST_DEBUG ("running mainloop %p", poll_data->loop);
799   g_main_loop_run (poll_data->loop);
800   GST_DEBUG ("mainloop stopped %p", poll_data->loop);
801   /* holds a ref */
802   ret = poll_data->message;
803
804   if (poll_data->timeout_id)
805     g_source_remove (poll_data->timeout_id);
806
807   /* poll_data may get destroyed at any time now */
808   g_source_remove (id);
809
810   GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
811
812   return ret;
813 }
814
815 /**
816  * gst_bus_async_signal_func:
817  * @bus: a #GstBus
818  * @message: the message received
819  * @data: user data
820  *
821  * A helper GstBusFunc that can be used to convert all asynchronous messages 
822  * into signals.
823  *
824  * Returns: TRUE
825  */
826 gboolean
827 gst_bus_async_signal_func (GstBus * bus, GstMessage * message, gpointer data)
828 {
829   GQuark detail = 0;
830
831   g_return_val_if_fail (GST_IS_BUS (bus), TRUE);
832   g_return_val_if_fail (message != NULL, TRUE);
833
834   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
835
836   g_signal_emit (bus, gst_bus_signals[ASYNC_MESSAGE], detail, message);
837
838   /* we never remove this source based on signal emission return values */
839   return TRUE;
840 }
841
842 /**
843  * gst_bus_sync_signal_handler:
844  * @bus: a #GstBus
845  * @message: the message received
846  * @data: user data
847  *
848  * A helper GstBusSyncHandler that can be used to convert all synchronous
849  * messages into signals.
850  *
851  * Returns: GST_BUS_PASS
852  */
853 GstBusSyncReply
854 gst_bus_sync_signal_handler (GstBus * bus, GstMessage * message, gpointer data)
855 {
856   GQuark detail = 0;
857
858   g_return_val_if_fail (GST_IS_BUS (bus), GST_BUS_DROP);
859   g_return_val_if_fail (message != NULL, GST_BUS_DROP);
860
861   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
862
863   g_signal_emit (bus, gst_bus_signals[SYNC_MESSAGE], detail, message);
864
865   return GST_BUS_PASS;
866 }
867
868 /**
869  * gst_bus_add_signal_watch:
870  * @bus: a #GstBus to create the watch for
871  *
872  * Adds a bus signal watch to the default main context with the default priority.
873  * After calling this statement, the bus will emit the message signal for each
874  * message posted on the bus.
875  *
876  * The watch can be removed using #g_source_remove().
877  *
878  * Returns: The event source id.
879  *
880  * MT safe.
881  */
882 guint
883 gst_bus_add_signal_watch (GstBus * bus)
884 {
885   g_return_val_if_fail (GST_IS_BUS (bus), 0);
886
887   return gst_bus_add_watch (bus, gst_bus_async_signal_func, NULL);
888 }