check/gst/gstbin.c (test_message_state_changed_children): Style fix..
[platform/upstream/gstreamer.git] / gst / gstbus.c
1 /* GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbus.c: GstBus subsystem
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbus
24  * @short_description: Asynchronous message bus subsystem
25  * @see_also: #GstMessage, #GstElement
26  *
27  * The #GstBus is an object responsible for delivering #GstMessages in
28  * a first-in first-out way from the streaming threads to the application.
29  *
30  * Since the application typically only wants to deal with delivery of these
31  * messages from one thread, the GstBus will marshall the messages between
32  * different threads. This is important since the actual streaming of media
33  * is done in another thread than the application.
34  *
35  * The GstBus provides support for #GSource based notifications. This makes it
36  * possible to handle the delivery in the glib mainloop.
37  *
38  * The #GSource callback function gst_bus_async_signal_func() can be used to
39  * convert all bus messages into signal emissions.
40  *
41  * A message is posted on the bus with the gst_bus_post() method. With the
42  * gst_bus_peek() and gst_bus_pop() methods one can look at or retrieve a previously
43  * posted message.
44  *
45  * The bus can be polled with the gst_bus_poll() method. This methods blocks
46  * up to the specified timeout value until one of the specified messages types
47  * is posted on the bus. The application can then _pop() the messages from the
48  * bus to handle them.
49  * Alternatively the application can register an asynchronous bus function
50  * using gst_bus_add_watch_full() or gst_bus_add_watch(). This function will
51  * receive messages a short while after they have been posted.
52  *
53  * It is also possible to get messages from the bus without any thread
54  * marshalling with the gst_bus_set_sync_handler() method. This makes it
55  * possible to react to a message in the same thread that posted the
56  * message on the bus. This should only be used if the application is able
57  * to deal with messages from different threads.
58  *
59  * Every #GstPipeline has one bus.
60  *
61  * Note that a #GstPipeline will set its bus into flushing state when changing
62  * from READY to NULL state.
63  *
64  * Last reviewed on 2005-10-28 (0.9.4)
65  */
66
67 #include <errno.h>
68 #ifdef HAVE_UNISTD_H
69 #  include <unistd.h>
70 #endif
71 #include <sys/types.h>
72
73 #include "gst_private.h"
74 #include "gstinfo.h"
75
76 #include "gstbus.h"
77
78 #define GST_CAT_DEFAULT GST_CAT_BUS
79 /* bus signals */
80 enum
81 {
82   SYNC_MESSAGE,
83   ASYNC_MESSAGE,
84   /* add more above */
85   LAST_SIGNAL
86 };
87
88 static void gst_bus_class_init (GstBusClass * klass);
89 static void gst_bus_init (GstBus * bus);
90 static void gst_bus_dispose (GObject * object);
91
92 static void gst_bus_set_property (GObject * object, guint prop_id,
93     const GValue * value, GParamSpec * pspec);
94 static void gst_bus_get_property (GObject * object, guint prop_id,
95     GValue * value, GParamSpec * pspec);
96
97 static GstObjectClass *parent_class = NULL;
98 static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
99
100 /* the context we wakeup when we posted a message on the bus */
101 static GMainContext *main_context;
102
103 GType
104 gst_bus_get_type (void)
105 {
106   static GType bus_type = 0;
107
108   if (!bus_type) {
109     static const GTypeInfo bus_info = {
110       sizeof (GstBusClass),
111       NULL,
112       NULL,
113       (GClassInitFunc) gst_bus_class_init,
114       NULL,
115       NULL,
116       sizeof (GstBus),
117       0,
118       (GInstanceInitFunc) gst_bus_init,
119       NULL
120     };
121
122     bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
123   }
124   return bus_type;
125 }
126
127 /* fixme: do something about this */
128 static void
129 marshal_VOID__MINIOBJECT (GClosure * closure, GValue * return_value,
130     guint n_param_values, const GValue * param_values, gpointer invocation_hint,
131     gpointer marshal_data)
132 {
133   typedef void (*marshalfunc_VOID__MINIOBJECT) (gpointer obj, gpointer arg1,
134       gpointer data2);
135   register marshalfunc_VOID__MINIOBJECT callback;
136   register GCClosure *cc = (GCClosure *) closure;
137   register gpointer data1, data2;
138
139   g_return_if_fail (n_param_values == 2);
140
141   if (G_CCLOSURE_SWAP_DATA (closure)) {
142     data1 = closure->data;
143     data2 = g_value_peek_pointer (param_values + 0);
144   } else {
145     data1 = g_value_peek_pointer (param_values + 0);
146     data2 = closure->data;
147   }
148   callback =
149       (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data : cc->
150       callback);
151
152   callback (data1, gst_value_get_mini_object (param_values + 1), data2);
153 }
154
155 static void
156 gst_bus_class_init (GstBusClass * klass)
157 {
158   GObjectClass *gobject_class;
159   GstObjectClass *gstobject_class;
160
161   gobject_class = (GObjectClass *) klass;
162   gstobject_class = (GstObjectClass *) klass;
163
164   parent_class = g_type_class_peek_parent (klass);
165
166   if (!g_thread_supported ())
167     g_thread_init (NULL);
168
169   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
170   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
171   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
172
173   /**
174    * GstBus::sync-message:
175    * @bus: the object which received the signal
176    * @message: the message that has been posted synchronously
177    *
178    * A message has been posted on the bus. This signal is emitted from the
179    * thread that posted the message so one has to be carefull with locking.
180    */
181   gst_bus_signals[SYNC_MESSAGE] =
182       g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
183       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
184       G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
185       marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
186
187   /**
188    * GstBus::message:
189    * @bus: the object which received the signal
190    * @message: the message that has been posted asynchronously
191    *
192    * A message has been posted on the bus. This signal is emitted from a
193    * GSource added to the mainloop.
194    */
195   gst_bus_signals[ASYNC_MESSAGE] =
196       g_signal_new ("message", G_TYPE_FROM_CLASS (klass),
197       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
198       G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
199       marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
200
201   main_context = g_main_context_default ();
202 }
203
204 static void
205 gst_bus_init (GstBus * bus)
206 {
207   bus->queue = g_queue_new ();
208   bus->queue_lock = g_mutex_new ();
209
210   GST_DEBUG_OBJECT (bus, "created");
211 }
212
213 static void
214 gst_bus_dispose (GObject * object)
215 {
216   GstBus *bus;
217
218   bus = GST_BUS (object);
219
220   if (bus->queue) {
221     GstMessage *message;
222
223     g_mutex_lock (bus->queue_lock);
224     do {
225       message = g_queue_pop_head (bus->queue);
226       if (message)
227         gst_message_unref (message);
228     } while (message != NULL);
229     g_queue_free (bus->queue);
230     bus->queue = NULL;
231     g_mutex_unlock (bus->queue_lock);
232     g_mutex_free (bus->queue_lock);
233     bus->queue_lock = NULL;
234   }
235
236   G_OBJECT_CLASS (parent_class)->dispose (object);
237 }
238
239 static void
240 gst_bus_set_property (GObject * object, guint prop_id,
241     const GValue * value, GParamSpec * pspec)
242 {
243   GstBus *bus;
244
245   bus = GST_BUS (object);
246
247   switch (prop_id) {
248     default:
249       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
250       break;
251   }
252 }
253
254 static void
255 gst_bus_get_property (GObject * object, guint prop_id,
256     GValue * value, GParamSpec * pspec)
257 {
258   GstBus *bus;
259
260   bus = GST_BUS (object);
261
262   switch (prop_id) {
263     default:
264       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
265       break;
266   }
267 }
268
269 /**
270  * gst_bus_new:
271  *
272  * Creates a new #GstBus instance.
273  *
274  * Returns: a new #GstBus instance
275  */
276 GstBus *
277 gst_bus_new (void)
278 {
279   GstBus *result;
280
281   result = g_object_new (gst_bus_get_type (), NULL);
282
283   return result;
284 }
285
286 /**
287  * gst_bus_post:
288  * @bus: a #GstBus to post on
289  * @message: The #GstMessage to post
290  *
291  * Post a message on the given bus. Ownership of the message
292  * is taken by the bus.
293  *
294  * Returns: TRUE if the message could be posted, FALSE if the bus is flushing.
295  *
296  * MT safe.
297  */
298 gboolean
299 gst_bus_post (GstBus * bus, GstMessage * message)
300 {
301   GstBusSyncReply reply = GST_BUS_PASS;
302   GstBusSyncHandler handler;
303   gpointer handler_data;
304
305   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
306   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
307
308   GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %s",
309       message, gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
310
311   GST_LOCK (bus);
312   /* check if the bus is flushing */
313   if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
314     goto is_flushing;
315
316   handler = bus->sync_handler;
317   handler_data = bus->sync_handler_data;
318   GST_UNLOCK (bus);
319
320   /* first call the sync handler if it is installed */
321   if (handler)
322     reply = handler (bus, message, handler_data);
323
324   /* now see what we should do with the message */
325   switch (reply) {
326     case GST_BUS_DROP:
327       /* drop the message */
328       GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message);
329       break;
330     case GST_BUS_PASS:
331       /* pass the message to the async queue, refcount passed in the queue */
332       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
333       g_mutex_lock (bus->queue_lock);
334       g_queue_push_tail (bus->queue, message);
335       g_mutex_unlock (bus->queue_lock);
336       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
337
338       /* FIXME cannot assume sources are only in the default context */
339       g_main_context_wakeup (main_context);
340
341       break;
342     case GST_BUS_ASYNC:
343     {
344       /* async delivery, we need a mutex and a cond to block
345        * on */
346       GMutex *lock = g_mutex_new ();
347       GCond *cond = g_cond_new ();
348
349       GST_MESSAGE_COND (message) = cond;
350       GST_MESSAGE_GET_LOCK (message) = lock;
351
352       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
353
354       /* now we lock the message mutex, send the message to the async
355        * queue. When the message is handled by the app and destroyed,
356        * the cond will be signalled and we can continue */
357       g_mutex_lock (lock);
358       g_mutex_lock (bus->queue_lock);
359       g_queue_push_tail (bus->queue, message);
360       g_mutex_unlock (bus->queue_lock);
361
362       /* FIXME cannot assume sources are only in the default context */
363       g_main_context_wakeup (main_context);
364
365       /* now block till the message is freed */
366       g_cond_wait (cond, lock);
367       g_mutex_unlock (lock);
368
369       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
370
371       g_mutex_free (lock);
372       g_cond_free (cond);
373       break;
374     }
375     default:
376       g_warning ("invalid return from bus sync handler");
377       break;
378   }
379   return TRUE;
380
381   /* ERRORS */
382 is_flushing:
383   {
384     GST_DEBUG_OBJECT (bus, "bus is flushing");
385     gst_message_unref (message);
386     GST_UNLOCK (bus);
387
388     return FALSE;
389   }
390 }
391
392 /**
393  * gst_bus_have_pending:
394  * @bus: a #GstBus to check
395  *
396  * Check if there are pending messages on the bus that
397  * should be handled.
398  *
399  * Returns: TRUE if there are messages on the bus to be handled, FALSE 
400  * otherwise.
401  *
402  * MT safe.
403  */
404 gboolean
405 gst_bus_have_pending (GstBus * bus)
406 {
407   gboolean result;
408
409   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
410
411   g_mutex_lock (bus->queue_lock);
412   /* see if there is a message on the bus */
413   result = !g_queue_is_empty (bus->queue);
414   g_mutex_unlock (bus->queue_lock);
415
416   return result;
417 }
418
419 /**
420  * gst_bus_set_flushing:
421  * @bus: a #GstBus
422  * @flushing: whether or not to flush the bus
423  *
424  * If @flushing, flush out and unref any messages queued in the bus. Releases
425  * references to the message origin objects. Will flush future messages until
426  * gst_bus_set_flushing() sets @flushing to #FALSE.
427  *
428  * MT safe.
429  */
430 void
431 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
432 {
433   GstMessage *message;
434
435   GST_LOCK (bus);
436
437   if (flushing) {
438     GST_OBJECT_FLAG_SET (bus, GST_BUS_FLUSHING);
439
440     GST_DEBUG_OBJECT (bus, "set bus flushing");
441
442     while ((message = gst_bus_pop (bus)))
443       gst_message_unref (message);
444   } else {
445     GST_DEBUG_OBJECT (bus, "unset bus flushing");
446     GST_OBJECT_FLAG_UNSET (bus, GST_BUS_FLUSHING);
447   }
448
449   GST_UNLOCK (bus);
450 }
451
452
453 /**
454  * gst_bus_pop:
455  * @bus: a #GstBus to pop
456  *
457  * Get a message from the bus.
458  *
459  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
460  *
461  * MT safe.
462  */
463 GstMessage *
464 gst_bus_pop (GstBus * bus)
465 {
466   GstMessage *message;
467
468   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
469
470   g_mutex_lock (bus->queue_lock);
471   message = g_queue_pop_head (bus->queue);
472   if (message)
473     GST_DEBUG_OBJECT (bus, "pop from bus, have %d messages, got message %p, %s",
474         g_queue_get_length (bus->queue) + 1, message,
475         gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
476   else
477     GST_DEBUG_OBJECT (bus, "pop from bus, no messages");
478   g_mutex_unlock (bus->queue_lock);
479
480   return message;
481 }
482
483 /**
484  * gst_bus_peek:
485  * @bus: a #GstBus
486  *
487  * Peek the message on the top of the bus' queue. The message will remain
488  * on the bus' message queue. A reference is returned, and needs to be unreffed
489  * by the caller.
490  *
491  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
492  *
493  * MT safe.
494  */
495 GstMessage *
496 gst_bus_peek (GstBus * bus)
497 {
498   GstMessage *message;
499
500   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
501
502   g_mutex_lock (bus->queue_lock);
503   message = g_queue_peek_head (bus->queue);
504   if (message)
505     gst_message_ref (message);
506   g_mutex_unlock (bus->queue_lock);
507
508   GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
509
510   return message;
511 }
512
513 /**
514  * gst_bus_set_sync_handler:
515  * @bus: a #GstBus to install the handler on
516  * @func: The handler function to install
517  * @data: User data that will be sent to the handler function.
518  *
519  * Sets the synchronous handler on the bus. The function will be called
520  * every time a new message is posted on the bus. Note that the function
521  * will be called in the same thread context as the posting object. This
522  * function is usually only called by the creator of the bus. Applications
523  * should handle messages asynchronously using the gst_bus watch and poll
524  * functions.
525  *
526  * You cannot replace an existing sync_handler. You can pass NULL to this
527  * function, which will clear the existing handler.
528  */
529 void
530 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
531 {
532   g_return_if_fail (GST_IS_BUS (bus));
533
534   GST_LOCK (bus);
535
536   /* Assert if the user attempts to replace an existing sync_handler,
537    * other than to clear it */
538   if (func != NULL && bus->sync_handler != NULL)
539     goto no_replace;
540
541   bus->sync_handler = func;
542   bus->sync_handler_data = data;
543   GST_UNLOCK (bus);
544
545   return;
546
547 no_replace:
548   {
549     GST_UNLOCK (bus);
550     g_warning ("cannot replace existing sync handler");
551     return;
552   }
553 }
554
555 /* GSource for the bus
556  */
557 typedef struct
558 {
559   GSource source;
560   GstBus *bus;
561 } GstBusSource;
562
563 static gboolean
564 gst_bus_source_prepare (GSource * source, gint * timeout)
565 {
566   GstBusSource *bsrc = (GstBusSource *) source;
567
568   *timeout = -1;
569   return gst_bus_have_pending (bsrc->bus);
570 }
571
572 static gboolean
573 gst_bus_source_check (GSource * source)
574 {
575   GstBusSource *bsrc = (GstBusSource *) source;
576
577   return gst_bus_have_pending (bsrc->bus);
578 }
579
580 static gboolean
581 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
582     gpointer user_data)
583 {
584   GstBusFunc handler = (GstBusFunc) callback;
585   GstBusSource *bsource = (GstBusSource *) source;
586   GstMessage *message;
587   gboolean keep;
588   GstBus *bus;
589
590   g_return_val_if_fail (bsource != NULL, FALSE);
591
592   bus = bsource->bus;
593
594   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
595
596   message = gst_bus_pop (bus);
597   g_return_val_if_fail (message != NULL, FALSE);
598
599   if (!handler)
600     goto no_handler;
601
602   GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
603
604   keep = handler (bus, message, user_data);
605   gst_message_unref (message);
606
607   GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep);
608
609   return keep;
610
611 no_handler:
612   {
613     g_warning ("GstBus watch dispatched without callback\n"
614         "You must call g_source_connect().");
615     gst_message_unref (message);
616     return FALSE;
617   }
618 }
619
620 static void
621 gst_bus_source_finalize (GSource * source)
622 {
623   GstBusSource *bsource = (GstBusSource *) source;
624
625   gst_object_unref (bsource->bus);
626   bsource->bus = NULL;
627 }
628
629 static GSourceFuncs gst_bus_source_funcs = {
630   gst_bus_source_prepare,
631   gst_bus_source_check,
632   gst_bus_source_dispatch,
633   gst_bus_source_finalize
634 };
635
636 /**
637  * gst_bus_create_watch:
638  * @bus: a #GstBus to create the watch for
639  *
640  * Create watch for this bus. The GSource will be dispatched whenever
641  * a message is on the bus. After the GSource is dispatched, the
642  * message is popped off the bus and unreffed.
643  *
644  * Returns: A #GSource that can be added to a mainloop.
645  */
646 GSource *
647 gst_bus_create_watch (GstBus * bus)
648 {
649   GstBusSource *source;
650
651   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
652
653   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
654       sizeof (GstBusSource));
655   gst_object_ref (bus);
656   source->bus = bus;
657
658   return (GSource *) source;
659 }
660
661 /**
662  * gst_bus_add_watch_full:
663  * @bus: a #GstBus to create the watch for.
664  * @priority: The priority of the watch.
665  * @func: A function to call when a message is received.
666  * @user_data: user data passed to @func.
667  * @notify: the function to call when the source is removed.
668  *
669  * Adds a bus watch to the default main context with the given priority.
670  * If the func returns FALSE, the source will be removed.
671  *
672  * When the func is called, the message belongs to the caller; if you want to
673  * keep a copy of it, call gst_message_ref() before leaving the func.
674  *
675  * The watch can be removed using #g_source_remove().
676  *
677  * Returns: The event source id.
678  *
679  * MT safe.
680  */
681 guint
682 gst_bus_add_watch_full (GstBus * bus, gint priority,
683     GstBusFunc func, gpointer user_data, GDestroyNotify notify)
684 {
685   guint id;
686   GSource *source;
687
688   g_return_val_if_fail (GST_IS_BUS (bus), 0);
689
690   source = gst_bus_create_watch (bus);
691
692   if (priority != G_PRIORITY_DEFAULT)
693     g_source_set_priority (source, priority);
694
695   g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
696
697   id = g_source_attach (source, NULL);
698   g_source_unref (source);
699
700   GST_DEBUG_OBJECT (bus, "New source %p", source);
701   return id;
702 }
703
704 /**
705  * gst_bus_add_watch:
706  * @bus: a #GstBus to create the watch for
707  * @func: A function to call when a message is received.
708  * @user_data: user data passed to @func.
709  *
710  * Adds a bus watch to the default main context with the default priority.
711  *
712  * The watch can be removed using #g_source_remove().
713  *
714  * Returns: The event source id.
715  *
716  * MT safe.
717  */
718 guint
719 gst_bus_add_watch (GstBus * bus, GstBusFunc func, gpointer user_data)
720 {
721   return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, func,
722       user_data, NULL);
723 }
724
725 typedef struct
726 {
727   GMainLoop *loop;
728   guint timeout_id;
729   gboolean source_running;
730   GstMessageType events;
731   GstMessage *message;
732 } GstBusPollData;
733
734 static void
735 poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
736 {
737   if (!g_main_loop_is_running (poll_data->loop)) {
738     GST_DEBUG ("mainloop %p not running", poll_data->loop);
739     return;
740   }
741
742   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
743     g_return_if_fail (poll_data->message == NULL);
744     /* keep ref to message */
745     poll_data->message = gst_message_ref (message);
746     GST_DEBUG ("mainloop %p quit", poll_data->loop);
747     g_main_loop_quit (poll_data->loop);
748   }
749 }
750
751 static gboolean
752 poll_timeout (GstBusPollData * poll_data)
753 {
754   GST_DEBUG ("mainloop %p quit", poll_data->loop);
755   g_main_loop_quit (poll_data->loop);
756
757   /* we don't remove the GSource as this would free our poll_data,
758    * which we still need */
759   return TRUE;
760 }
761
762 static void
763 poll_destroy (GstBusPollData * poll_data, gpointer unused)
764 {
765   poll_data->source_running = FALSE;
766   if (!poll_data->timeout_id) {
767     g_main_loop_unref (poll_data->loop);
768     g_free (poll_data);
769   }
770 }
771
772 static void
773 poll_destroy_timeout (GstBusPollData * poll_data)
774 {
775   poll_data->timeout_id = 0;
776   if (!poll_data->source_running) {
777     g_main_loop_unref (poll_data->loop);
778     g_free (poll_data);
779   }
780 }
781
782 /**
783  * gst_bus_poll:
784  * @bus: a #GstBus
785  * @events: a mask of #GstMessageType, representing the set of message types to
786  * poll for.
787  * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
788  *
789  * Poll the bus for messages. Will block while waiting for messages to come.
790  * You can specify a maximum time to poll with the @timeout parameter. If
791  * @timeout is negative, this function will block indefinitely.
792  *
793  * All messages not in @events will be popped off the bus and will be ignored.
794  *
795  * Because poll is implemented using the "message" signal enabled by
796  * gst_bus_add_signal_watch(), calling gst_bus_poll() will cause the "message"
797  * signal to be emitted for every message that poll sees. Thus a "message"
798  * signal handler will see the same messages that this function sees -- neither
799  * will steal messages from the other.
800  *
801  * This function will run a main loop from the default main context when
802  * polling.
803  *
804  * Returns: The message that was received, or NULL if the poll timed out.
805  * The message is taken from the bus and needs to be unreffed after usage.
806  */
807 GstMessage *
808 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
809 {
810   GstBusPollData *poll_data;
811   GstMessage *ret;
812   gulong id;
813
814   poll_data = g_new0 (GstBusPollData, 1);
815   poll_data->source_running = TRUE;
816   poll_data->loop = g_main_loop_new (NULL, FALSE);
817   poll_data->events = events;
818   poll_data->message = NULL;
819
820   if (timeout >= 0)
821     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
822         timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
823         (GDestroyNotify) poll_destroy_timeout);
824   else
825     poll_data->timeout_id = 0;
826
827   id = g_signal_connect_data (bus, "message", G_CALLBACK (poll_func), poll_data,
828       (GClosureNotify) poll_destroy, 0);
829
830   /* these can be nested, so it's ok */
831   gst_bus_add_signal_watch (bus);
832
833   GST_DEBUG ("running mainloop %p", poll_data->loop);
834   g_main_loop_run (poll_data->loop);
835   GST_DEBUG ("mainloop stopped %p", poll_data->loop);
836
837   gst_bus_remove_signal_watch (bus);
838
839   /* holds a ref */
840   ret = poll_data->message;
841
842   if (poll_data->timeout_id)
843     g_source_remove (poll_data->timeout_id);
844
845   /* poll_data will be freed now */
846   g_signal_handler_disconnect (bus, id);
847
848   GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
849
850   return ret;
851 }
852
853 /**
854  * gst_bus_async_signal_func:
855  * @bus: a #GstBus
856  * @message: the #GstMessage received
857  * @data: user data
858  *
859  * A helper GstBusFunc that can be used to convert all asynchronous messages
860  * into signals.
861  *
862  * Returns: TRUE
863  */
864 gboolean
865 gst_bus_async_signal_func (GstBus * bus, GstMessage * message, gpointer data)
866 {
867   GQuark detail = 0;
868
869   g_return_val_if_fail (GST_IS_BUS (bus), TRUE);
870   g_return_val_if_fail (message != NULL, TRUE);
871
872   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
873
874   g_signal_emit (bus, gst_bus_signals[ASYNC_MESSAGE], detail, message);
875
876   /* we never remove this source based on signal emission return values */
877   return TRUE;
878 }
879
880 /**
881  * gst_bus_sync_signal_handler:
882  * @bus: a #GstBus
883  * @message: the #GstMessage received
884  * @data: user data
885  *
886  * A helper GstBusSyncHandler that can be used to convert all synchronous
887  * messages into signals.
888  *
889  * Returns: GST_BUS_PASS
890  */
891 GstBusSyncReply
892 gst_bus_sync_signal_handler (GstBus * bus, GstMessage * message, gpointer data)
893 {
894   GQuark detail = 0;
895
896   g_return_val_if_fail (GST_IS_BUS (bus), GST_BUS_DROP);
897   g_return_val_if_fail (message != NULL, GST_BUS_DROP);
898
899   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
900
901   g_signal_emit (bus, gst_bus_signals[SYNC_MESSAGE], detail, message);
902
903   return GST_BUS_PASS;
904 }
905
906 /**
907  * gst_bus_add_signal_watch:
908  * @bus: a #GstBus on which you want to recieve the "message" signal
909  *
910  * Adds a bus signal watch to the default main context with the default priority.
911  * After calling this statement, the bus will emit the message signal for each
912  * message posted on the bus.
913  *
914  * This function may be called multiple times. To clean up, the caller is
915  * responsible for calling gst_bus_remove_signal_watch() as many times as this
916  * function is called.
917  *
918  * MT safe.
919  */
920 void
921 gst_bus_add_signal_watch (GstBus * bus)
922 {
923   g_return_if_fail (GST_IS_BUS (bus));
924
925   /* I know the callees don't take this lock, so go ahead and abuse it */
926   GST_LOCK (bus);
927
928   if (bus->num_signal_watchers > 0)
929     goto done;
930
931   g_assert (bus->signal_watch_id == 0);
932
933   bus->signal_watch_id =
934       gst_bus_add_watch (bus, gst_bus_async_signal_func, NULL);
935
936 done:
937
938   bus->num_signal_watchers++;
939
940   GST_UNLOCK (bus);
941 }
942
943 /**
944  * gst_bus_remove_signal_watch:
945  * @bus: a #GstBus you previously added a signal watch to
946  *
947  * Removes a signal watch previously added with gst_bus_add_signal_watch().
948  *
949  * MT safe.
950  */
951 void
952 gst_bus_remove_signal_watch (GstBus * bus)
953 {
954   g_return_if_fail (GST_IS_BUS (bus));
955
956   /* I know the callees don't take this lock, so go ahead and abuse it */
957   GST_LOCK (bus);
958
959   if (bus->num_signal_watchers == 0)
960     goto error;
961
962   bus->num_signal_watchers--;
963
964   if (bus->num_signal_watchers > 0)
965     goto done;
966
967   g_source_remove (bus->signal_watch_id);
968   bus->signal_watch_id = 0;
969
970 done:
971   GST_UNLOCK (bus);
972   return;
973
974 error:
975   {
976     g_critical ("Bus %s has no signal watches attached", GST_OBJECT_NAME (bus));
977     GST_UNLOCK (bus);
978     return;
979   }
980 }