parse: Don't hold element's object lock while querying element pads' caps
[platform/upstream/gstreamer.git] / gst / gstbus.c
1 /* GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbus.c: GstBus subsystem
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gstbus
24  * @short_description: Asynchronous message bus subsystem
25  * @see_also: #GstMessage, #GstElement
26  *
27  * The #GstBus is an object responsible for delivering #GstMessage packets in
28  * a first-in first-out way from the streaming threads (see #GstTask) to the
29  * application.
30  *
31  * Since the application typically only wants to deal with delivery of these
32  * messages from one thread, the GstBus will marshall the messages between
33  * different threads. This is important since the actual streaming of media
34  * is done in another thread than the application.
35  *
36  * The GstBus provides support for #GSource based notifications. This makes it
37  * possible to handle the delivery in the glib mainloop.
38  *
39  * The #GSource callback function gst_bus_async_signal_func() can be used to
40  * convert all bus messages into signal emissions.
41  *
42  * A message is posted on the bus with the gst_bus_post() method. With the
43  * gst_bus_peek() and gst_bus_pop() methods one can look at or retrieve a
44  * previously posted message.
45  *
46  * The bus can be polled with the gst_bus_poll() method. This methods blocks
47  * up to the specified timeout value until one of the specified messages types
48  * is posted on the bus. The application can then gst_bus_pop() the messages
49  * from the bus to handle them.
50  * Alternatively the application can register an asynchronous bus function
51  * using gst_bus_add_watch_full() or gst_bus_add_watch(). This function will
52  * install a #GSource in the default glib main loop and will deliver messages
53  * a short while after they have been posted. Note that the main loop should
54  * be running for the asynchronous callbacks.
55  *
56  * It is also possible to get messages from the bus without any thread
57  * marshalling with the gst_bus_set_sync_handler() method. This makes it
58  * possible to react to a message in the same thread that posted the
59  * message on the bus. This should only be used if the application is able
60  * to deal with messages from different threads.
61  *
62  * Every #GstPipeline has one bus.
63  *
64  * Note that a #GstPipeline will set its bus into flushing state when changing
65  * from READY to NULL state.
66  */
67
68 #include "gst_private.h"
69 #include <errno.h>
70 #ifdef HAVE_UNISTD_H
71 #  include <unistd.h>
72 #endif
73 #include <sys/types.h>
74
75 #include "gstatomicqueue.h"
76 #include "gstinfo.h"
77 #include "gstpoll.h"
78
79 #include "gstbus.h"
80 #include "glib-compat-private.h"
81
82 #ifdef G_OS_WIN32
83 #  ifndef EWOULDBLOCK
84 #  define EWOULDBLOCK EAGAIN    /* This is just to placate gcc */
85 #  endif
86 #endif /* G_OS_WIN32 */
87
88 #define GST_CAT_DEFAULT GST_CAT_BUS
89 /* bus signals */
90 enum
91 {
92   SYNC_MESSAGE,
93   ASYNC_MESSAGE,
94   /* add more above */
95   LAST_SIGNAL
96 };
97
98 #define DEFAULT_ENABLE_ASYNC (TRUE)
99
100 enum
101 {
102   PROP_0,
103   PROP_ENABLE_ASYNC
104 };
105
106 static void gst_bus_dispose (GObject * object);
107 static void gst_bus_finalize (GObject * object);
108
109 static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
110
111 struct _GstBusPrivate
112 {
113   GstAtomicQueue *queue;
114   GMutex queue_lock;
115
116   GstBusSyncHandler sync_handler;
117   gpointer sync_handler_data;
118   GDestroyNotify sync_handler_notify;
119
120   guint num_signal_watchers;
121
122   guint num_sync_message_emitters;
123   GSource *signal_watch;
124
125   gboolean enable_async;
126   GstPoll *poll;
127   GPollFD pollfd;
128 };
129
130 #define gst_bus_parent_class parent_class
131 G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
132
133 static void
134 gst_bus_set_property (GObject * object,
135     guint prop_id, const GValue * value, GParamSpec * pspec)
136 {
137   GstBus *bus = GST_BUS_CAST (object);
138
139   switch (prop_id) {
140     case PROP_ENABLE_ASYNC:
141       bus->priv->enable_async = g_value_get_boolean (value);
142       break;
143     default:
144       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
145       break;
146   }
147 }
148
149 static void
150 gst_bus_constructed (GObject * object)
151 {
152   GstBus *bus = GST_BUS_CAST (object);
153
154   if (bus->priv->enable_async) {
155     bus->priv->poll = gst_poll_new_timer ();
156     gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
157   }
158
159   G_OBJECT_CLASS (gst_bus_parent_class)->constructed (object);
160 }
161
162 static void
163 gst_bus_class_init (GstBusClass * klass)
164 {
165   GObjectClass *gobject_class = (GObjectClass *) klass;
166
167   gobject_class->dispose = gst_bus_dispose;
168   gobject_class->finalize = gst_bus_finalize;
169   gobject_class->set_property = gst_bus_set_property;
170   gobject_class->constructed = gst_bus_constructed;
171
172   /**
173    * GstBus::enable-async:
174    *
175    * Enable async message delivery support for bus watches,
176    * gst_bus_pop() and similar API. Without this only the
177    * synchronous message handlers are called.
178    *
179    * This property is used to create the child element buses
180    * in #GstBin.
181    */
182   g_object_class_install_property (gobject_class, PROP_ENABLE_ASYNC,
183       g_param_spec_boolean ("enable-async", "Enable Async",
184           "Enable async message delivery for bus watches and gst_bus_pop()",
185           DEFAULT_ENABLE_ASYNC,
186           G_PARAM_CONSTRUCT_ONLY | G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
187
188   /**
189    * GstBus::sync-message:
190    * @bus: the object which received the signal
191    * @message: the message that has been posted synchronously
192    *
193    * A message has been posted on the bus. This signal is emitted from the
194    * thread that posted the message so one has to be careful with locking.
195    *
196    * This signal will not be emitted by default, you have to call
197    * gst_bus_enable_sync_message_emission() before.
198    */
199   gst_bus_signals[SYNC_MESSAGE] =
200       g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
201       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
202       G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
203       g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
204
205   /**
206    * GstBus::message:
207    * @bus: the object which received the signal
208    * @message: the message that has been posted asynchronously
209    *
210    * A message has been posted on the bus. This signal is emitted from a
211    * GSource added to the mainloop. this signal will only be emitted when
212    * there is a mainloop running.
213    */
214   gst_bus_signals[ASYNC_MESSAGE] =
215       g_signal_new ("message", G_TYPE_FROM_CLASS (klass),
216       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
217       G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
218       g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
219
220   g_type_class_add_private (klass, sizeof (GstBusPrivate));
221 }
222
223 static void
224 gst_bus_init (GstBus * bus)
225 {
226   bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
227   bus->priv->enable_async = DEFAULT_ENABLE_ASYNC;
228   g_mutex_init (&bus->priv->queue_lock);
229   bus->priv->queue = gst_atomic_queue_new (32);
230
231   /* clear floating flag */
232   gst_object_ref_sink (bus);
233
234   GST_DEBUG_OBJECT (bus, "created");
235 }
236
237 static void
238 gst_bus_dispose (GObject * object)
239 {
240   GstBus *bus = GST_BUS (object);
241
242   if (bus->priv->queue) {
243     GstMessage *message;
244
245     g_mutex_lock (&bus->priv->queue_lock);
246     do {
247       message = gst_atomic_queue_pop (bus->priv->queue);
248       if (message)
249         gst_message_unref (message);
250     } while (message != NULL);
251     gst_atomic_queue_unref (bus->priv->queue);
252     bus->priv->queue = NULL;
253     g_mutex_unlock (&bus->priv->queue_lock);
254     g_mutex_clear (&bus->priv->queue_lock);
255
256     if (bus->priv->poll)
257       gst_poll_free (bus->priv->poll);
258     bus->priv->poll = NULL;
259   }
260
261   G_OBJECT_CLASS (parent_class)->dispose (object);
262 }
263
264 static void
265 gst_bus_finalize (GObject * object)
266 {
267   GstBus *bus = GST_BUS (object);
268
269   if (bus->priv->sync_handler_notify)
270     bus->priv->sync_handler_notify (bus->priv->sync_handler_data);
271
272   G_OBJECT_CLASS (parent_class)->finalize (object);
273 }
274
275 /**
276  * gst_bus_new:
277  *
278  * Creates a new #GstBus instance.
279  *
280  * Returns: (transfer full): a new #GstBus instance
281  */
282 GstBus *
283 gst_bus_new (void)
284 {
285   GstBus *result;
286
287   result = g_object_newv (gst_bus_get_type (), 0, NULL);
288   GST_DEBUG_OBJECT (result, "created new bus");
289
290   return result;
291 }
292
293 /**
294  * gst_bus_post:
295  * @bus: a #GstBus to post on
296  * @message: (transfer full): the #GstMessage to post
297  *
298  * Post a message on the given bus. Ownership of the message
299  * is taken by the bus.
300  *
301  * Returns: %TRUE if the message could be posted, %FALSE if the bus is flushing.
302  *
303  * MT safe.
304  */
305 gboolean
306 gst_bus_post (GstBus * bus, GstMessage * message)
307 {
308   GstBusSyncReply reply = GST_BUS_PASS;
309   GstBusSyncHandler handler;
310   gboolean emit_sync_message;
311   gpointer handler_data;
312
313   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
314   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
315
316   GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus %" GST_PTR_FORMAT, message,
317       message);
318
319   /* check we didn't accidentally add a public flag that maps to same value */
320   g_assert (!GST_MINI_OBJECT_FLAG_IS_SET (message,
321           GST_MESSAGE_FLAG_ASYNC_DELIVERY));
322
323   GST_OBJECT_LOCK (bus);
324   /* check if the bus is flushing */
325   if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
326     goto is_flushing;
327
328   handler = bus->priv->sync_handler;
329   handler_data = bus->priv->sync_handler_data;
330   emit_sync_message = bus->priv->num_sync_message_emitters > 0;
331   GST_OBJECT_UNLOCK (bus);
332
333   /* first call the sync handler if it is installed */
334   if (handler)
335     reply = handler (bus, message, handler_data);
336
337   /* emit sync-message if requested to do so via
338      gst_bus_enable_sync_message_emission. terrible but effective */
339   if (emit_sync_message && reply != GST_BUS_DROP
340       && handler != gst_bus_sync_signal_handler)
341     gst_bus_sync_signal_handler (bus, message, NULL);
342
343   /* If this is a bus without async message delivery
344    * always drop the message */
345   if (!bus->priv->poll)
346     reply = GST_BUS_DROP;
347
348   /* now see what we should do with the message */
349   switch (reply) {
350     case GST_BUS_DROP:
351       /* drop the message */
352       GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message);
353       break;
354     case GST_BUS_PASS:
355       /* pass the message to the async queue, refcount passed in the queue */
356       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
357       gst_atomic_queue_push (bus->priv->queue, message);
358       gst_poll_write_control (bus->priv->poll);
359       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
360
361       break;
362     case GST_BUS_ASYNC:
363     {
364       /* async delivery, we need a mutex and a cond to block
365        * on */
366       GCond *cond = GST_MESSAGE_GET_COND (message);
367       GMutex *lock = GST_MESSAGE_GET_LOCK (message);
368
369       g_cond_init (cond);
370       g_mutex_init (lock);
371
372       GST_MINI_OBJECT_FLAG_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY);
373
374       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
375
376       /* now we lock the message mutex, send the message to the async
377        * queue. When the message is handled by the app and destroyed,
378        * the cond will be signalled and we can continue */
379       g_mutex_lock (lock);
380
381       gst_atomic_queue_push (bus->priv->queue, message);
382       gst_poll_write_control (bus->priv->poll);
383
384       /* now block till the message is freed */
385       g_cond_wait (cond, lock);
386
387       /* we acquired a new ref from gst_message_dispose() so we can clean up */
388       g_mutex_unlock (lock);
389
390       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
391
392       GST_MINI_OBJECT_FLAG_UNSET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY);
393
394       g_mutex_clear (lock);
395       g_cond_clear (cond);
396
397       gst_message_unref (message);
398       break;
399     }
400     default:
401       g_warning ("invalid return from bus sync handler");
402       break;
403   }
404   return TRUE;
405
406   /* ERRORS */
407 is_flushing:
408   {
409     GST_DEBUG_OBJECT (bus, "bus is flushing");
410     GST_OBJECT_UNLOCK (bus);
411     gst_message_unref (message);
412
413     return FALSE;
414   }
415 }
416
417 /**
418  * gst_bus_have_pending:
419  * @bus: a #GstBus to check
420  *
421  * Check if there are pending messages on the bus that
422  * should be handled.
423  *
424  * Returns: %TRUE if there are messages on the bus to be handled, %FALSE
425  * otherwise.
426  *
427  * MT safe.
428  */
429 gboolean
430 gst_bus_have_pending (GstBus * bus)
431 {
432   gboolean result;
433
434   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
435
436   /* see if there is a message on the bus */
437   result = gst_atomic_queue_length (bus->priv->queue) != 0;
438
439   return result;
440 }
441
442 /**
443  * gst_bus_set_flushing:
444  * @bus: a #GstBus
445  * @flushing: whether or not to flush the bus
446  *
447  * If @flushing, flush out and unref any messages queued in the bus. Releases
448  * references to the message origin objects. Will flush future messages until
449  * gst_bus_set_flushing() sets @flushing to %FALSE.
450  *
451  * MT safe.
452  */
453 void
454 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
455 {
456   GstMessage *message;
457   GList *message_list = NULL;
458
459   g_return_if_fail (GST_IS_BUS (bus));
460
461   GST_OBJECT_LOCK (bus);
462
463   if (flushing) {
464     GST_OBJECT_FLAG_SET (bus, GST_BUS_FLUSHING);
465
466     GST_DEBUG_OBJECT (bus, "set bus flushing");
467
468     while ((message = gst_bus_pop (bus)))
469       message_list = g_list_prepend (message_list, message);
470   } else {
471     GST_DEBUG_OBJECT (bus, "unset bus flushing");
472     GST_OBJECT_FLAG_UNSET (bus, GST_BUS_FLUSHING);
473   }
474
475   GST_OBJECT_UNLOCK (bus);
476
477   g_list_free_full (message_list, (GDestroyNotify) gst_message_unref);
478 }
479
480 /**
481  * gst_bus_timed_pop_filtered:
482  * @bus: a #GstBus to pop from
483  * @timeout: a timeout in nanoseconds, or GST_CLOCK_TIME_NONE to wait forever
484  * @types: message types to take into account, GST_MESSAGE_ANY for any type
485  *
486  * Get a message from the bus whose type matches the message type mask @types,
487  * waiting up to the specified timeout (and discarding any messages that do not
488  * match the mask provided).
489  *
490  * If @timeout is 0, this function behaves like gst_bus_pop_filtered(). If
491  * @timeout is #GST_CLOCK_TIME_NONE, this function will block forever until a
492  * matching message was posted on the bus.
493  *
494  * Returns: (transfer full) (nullable): a #GstMessage matching the
495  *     filter in @types, or %NULL if no matching message was found on
496  *     the bus until the timeout expired. The message is taken from
497  *     the bus and needs to be unreffed with gst_message_unref() after
498  *     usage.
499  *
500  * MT safe.
501  */
502 GstMessage *
503 gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
504     GstMessageType types)
505 {
506   GstMessage *message;
507   GTimeVal now, then;
508   gboolean first_round = TRUE;
509   GstClockTime elapsed = 0;
510
511   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
512   g_return_val_if_fail (types != 0, NULL);
513   g_return_val_if_fail (timeout == 0 || bus->priv->poll != NULL, NULL);
514
515   g_mutex_lock (&bus->priv->queue_lock);
516
517   while (TRUE) {
518     gint ret;
519
520     GST_LOG_OBJECT (bus, "have %d messages",
521         gst_atomic_queue_length (bus->priv->queue));
522
523     while ((message = gst_atomic_queue_pop (bus->priv->queue))) {
524       if (bus->priv->poll) {
525         while (!gst_poll_read_control (bus->priv->poll)) {
526           if (errno == EWOULDBLOCK) {
527             /* Retry, this can happen if pushing to the queue has finished,
528              * popping here succeeded but writing control did not finish
529              * before we got to this line. */
530             /* Give other threads the chance to do something */
531             g_thread_yield ();
532             continue;
533           } else {
534             /* This is a real error and means that either the bus is in an
535              * inconsistent state, or the GstPoll is invalid. GstPoll already
536              * prints a critical warning about this, no need to do that again
537              * ourselves */
538             break;
539           }
540         }
541       }
542
543       GST_DEBUG_OBJECT (bus, "got message %p, %s from %s, type mask is %u",
544           message, GST_MESSAGE_TYPE_NAME (message),
545           GST_MESSAGE_SRC_NAME (message), (guint) types);
546       if ((GST_MESSAGE_TYPE (message) & types) != 0) {
547         /* Extra check to ensure extended types don't get matched unless
548          * asked for */
549         if ((!GST_MESSAGE_TYPE_IS_EXTENDED (message))
550             || (types & GST_MESSAGE_EXTENDED)) {
551           /* exit the loop, we have a message */
552           goto beach;
553         }
554       }
555
556       GST_DEBUG_OBJECT (bus, "discarding message, does not match mask");
557       gst_message_unref (message);
558       message = NULL;
559     }
560
561     /* no need to wait, exit loop */
562     if (timeout == 0)
563       break;
564
565     else if (timeout != GST_CLOCK_TIME_NONE) {
566       if (first_round) {
567         g_get_current_time (&then);
568         first_round = FALSE;
569       } else {
570         g_get_current_time (&now);
571
572         elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then);
573
574         if (elapsed > timeout)
575           break;
576       }
577     }
578
579     /* only here in timeout case */
580     g_assert (bus->priv->poll);
581     g_mutex_unlock (&bus->priv->queue_lock);
582     ret = gst_poll_wait (bus->priv->poll, timeout - elapsed);
583     g_mutex_lock (&bus->priv->queue_lock);
584
585     if (ret == 0) {
586       GST_INFO_OBJECT (bus, "timed out, breaking loop");
587       break;
588     } else {
589       GST_INFO_OBJECT (bus, "we got woken up, recheck for message");
590     }
591   }
592
593 beach:
594
595   g_mutex_unlock (&bus->priv->queue_lock);
596
597   return message;
598 }
599
600
601 /**
602  * gst_bus_timed_pop:
603  * @bus: a #GstBus to pop
604  * @timeout: a timeout
605  *
606  * Get a message from the bus, waiting up to the specified timeout.
607  *
608  * If @timeout is 0, this function behaves like gst_bus_pop(). If @timeout is
609  * #GST_CLOCK_TIME_NONE, this function will block forever until a message was
610  * posted on the bus.
611  *
612  * Returns: (transfer full) (nullable): the #GstMessage that is on the
613  *     bus after the specified timeout or %NULL if the bus is empty
614  *     after the timeout expired.  The message is taken from the bus
615  *     and needs to be unreffed with gst_message_unref() after usage.
616  *
617  * MT safe.
618  */
619 GstMessage *
620 gst_bus_timed_pop (GstBus * bus, GstClockTime timeout)
621 {
622   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
623
624   return gst_bus_timed_pop_filtered (bus, timeout, GST_MESSAGE_ANY);
625 }
626
627 /**
628  * gst_bus_pop_filtered:
629  * @bus: a #GstBus to pop
630  * @types: message types to take into account
631  *
632  * Get a message matching @type from the bus.  Will discard all messages on
633  * the bus that do not match @type and that have been posted before the first
634  * message that does match @type.  If there is no message matching @type on
635  * the bus, all messages will be discarded. It is not possible to use message
636  * enums beyond #GST_MESSAGE_EXTENDED in the @events mask.
637  *
638  * Returns: (transfer full) (nullable): the next #GstMessage matching
639  *     @type that is on the bus, or %NULL if the bus is empty or there
640  *     is no message matching @type. The message is taken from the bus
641  *     and needs to be unreffed with gst_message_unref() after usage.
642  *
643  * MT safe.
644  */
645 GstMessage *
646 gst_bus_pop_filtered (GstBus * bus, GstMessageType types)
647 {
648   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
649   g_return_val_if_fail (types != 0, NULL);
650
651   return gst_bus_timed_pop_filtered (bus, 0, types);
652 }
653
654 /**
655  * gst_bus_pop:
656  * @bus: a #GstBus to pop
657  *
658  * Get a message from the bus.
659  *
660  * Returns: (transfer full) (nullable): the #GstMessage that is on the
661  *     bus, or %NULL if the bus is empty. The message is taken from
662  *     the bus and needs to be unreffed with gst_message_unref() after
663  *     usage.
664  *
665  * MT safe.
666  */
667 GstMessage *
668 gst_bus_pop (GstBus * bus)
669 {
670   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
671
672   return gst_bus_timed_pop_filtered (bus, 0, GST_MESSAGE_ANY);
673 }
674
675 /**
676  * gst_bus_peek:
677  * @bus: a #GstBus
678  *
679  * Peek the message on the top of the bus' queue. The message will remain
680  * on the bus' message queue. A reference is returned, and needs to be unreffed
681  * by the caller.
682  *
683  * Returns: (transfer full) (nullable): the #GstMessage that is on the
684  *     bus, or %NULL if the bus is empty.
685  *
686  * MT safe.
687  */
688 GstMessage *
689 gst_bus_peek (GstBus * bus)
690 {
691   GstMessage *message;
692
693   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
694
695   g_mutex_lock (&bus->priv->queue_lock);
696   message = gst_atomic_queue_peek (bus->priv->queue);
697   if (message)
698     gst_message_ref (message);
699   g_mutex_unlock (&bus->priv->queue_lock);
700
701   GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
702
703   return message;
704 }
705
706 /**
707  * gst_bus_set_sync_handler:
708  * @bus: a #GstBus to install the handler on
709  * @func: (allow-none): The handler function to install
710  * @user_data: User data that will be sent to the handler function.
711  * @notify: called when @user_data becomes unused
712  *
713  * Sets the synchronous handler on the bus. The function will be called
714  * every time a new message is posted on the bus. Note that the function
715  * will be called in the same thread context as the posting object. This
716  * function is usually only called by the creator of the bus. Applications
717  * should handle messages asynchronously using the gst_bus watch and poll
718  * functions.
719  *
720  * You cannot replace an existing sync_handler. You can pass %NULL to this
721  * function, which will clear the existing handler.
722  */
723 void
724 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func,
725     gpointer user_data, GDestroyNotify notify)
726 {
727   GDestroyNotify old_notify;
728
729   g_return_if_fail (GST_IS_BUS (bus));
730
731   GST_OBJECT_LOCK (bus);
732   /* Assert if the user attempts to replace an existing sync_handler,
733    * other than to clear it */
734   if (func != NULL && bus->priv->sync_handler != NULL)
735     goto no_replace;
736
737   if ((old_notify = bus->priv->sync_handler_notify)) {
738     gpointer old_data = bus->priv->sync_handler_data;
739
740     bus->priv->sync_handler_data = NULL;
741     bus->priv->sync_handler_notify = NULL;
742     GST_OBJECT_UNLOCK (bus);
743
744     old_notify (old_data);
745
746     GST_OBJECT_LOCK (bus);
747   }
748   bus->priv->sync_handler = func;
749   bus->priv->sync_handler_data = user_data;
750   bus->priv->sync_handler_notify = notify;
751   GST_OBJECT_UNLOCK (bus);
752
753   return;
754
755 no_replace:
756   {
757     GST_OBJECT_UNLOCK (bus);
758     g_warning ("cannot replace existing sync handler");
759     return;
760   }
761 }
762
763 /* GSource for the bus
764  */
765 typedef struct
766 {
767   GSource source;
768   GstBus *bus;
769 } GstBusSource;
770
771 static gboolean
772 gst_bus_source_prepare (GSource * source, gint * timeout)
773 {
774   *timeout = -1;
775   return FALSE;
776 }
777
778 static gboolean
779 gst_bus_source_check (GSource * source)
780 {
781   GstBusSource *bsrc = (GstBusSource *) source;
782
783   return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR);
784 }
785
786 static gboolean
787 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
788     gpointer user_data)
789 {
790   GstBusFunc handler = (GstBusFunc) callback;
791   GstBusSource *bsource = (GstBusSource *) source;
792   GstMessage *message;
793   gboolean keep;
794   GstBus *bus;
795
796   g_return_val_if_fail (bsource != NULL, FALSE);
797
798   bus = bsource->bus;
799
800   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
801
802   message = gst_bus_pop (bus);
803
804   /* The message queue might be empty if some other thread or callback set
805    * the bus to flushing between check/prepare and dispatch */
806   if (G_UNLIKELY (message == NULL))
807     return TRUE;
808
809   if (!handler)
810     goto no_handler;
811
812   GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %" GST_PTR_FORMAT,
813       source, message);
814
815   keep = handler (bus, message, user_data);
816   gst_message_unref (message);
817
818   GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep);
819
820   return keep;
821
822 no_handler:
823   {
824     g_warning ("GstBus watch dispatched without callback\n"
825         "You must call g_source_set_callback().");
826     gst_message_unref (message);
827     return FALSE;
828   }
829 }
830
831 static void
832 gst_bus_source_finalize (GSource * source)
833 {
834   GstBusSource *bsource = (GstBusSource *) source;
835   GstBus *bus;
836
837   bus = bsource->bus;
838
839   GST_DEBUG_OBJECT (bus, "finalize source %p", source);
840
841   GST_OBJECT_LOCK (bus);
842   if (bus->priv->signal_watch == source)
843     bus->priv->signal_watch = NULL;
844   GST_OBJECT_UNLOCK (bus);
845
846   gst_object_unref (bsource->bus);
847   bsource->bus = NULL;
848 }
849
850 static GSourceFuncs gst_bus_source_funcs = {
851   gst_bus_source_prepare,
852   gst_bus_source_check,
853   gst_bus_source_dispatch,
854   gst_bus_source_finalize
855 };
856
857 /**
858  * gst_bus_create_watch:
859  * @bus: a #GstBus to create the watch for
860  *
861  * Create watch for this bus. The GSource will be dispatched whenever
862  * a message is on the bus. After the GSource is dispatched, the
863  * message is popped off the bus and unreffed.
864  *
865  * Returns: (transfer full): a #GSource that can be added to a mainloop.
866  */
867 GSource *
868 gst_bus_create_watch (GstBus * bus)
869 {
870   GstBusSource *source;
871
872   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
873   g_return_val_if_fail (bus->priv->poll != NULL, NULL);
874
875   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
876       sizeof (GstBusSource));
877
878   g_source_set_name ((GSource *) source, "GStreamer message bus watch");
879
880   source->bus = gst_object_ref (bus);
881   g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
882
883   return (GSource *) source;
884 }
885
886 /* must be called with the bus OBJECT LOCK */
887 static guint
888 gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
889     GstBusFunc func, gpointer user_data, GDestroyNotify notify)
890 {
891   GMainContext *ctx;
892   guint id;
893   GSource *source;
894
895   if (bus->priv->signal_watch) {
896     GST_ERROR_OBJECT (bus,
897         "Tried to add new watch while one was already there");
898     return 0;
899   }
900
901   source = gst_bus_create_watch (bus);
902   if (!source) {
903     g_critical ("Creating bus watch failed");
904     return 0;
905   }
906
907   if (priority != G_PRIORITY_DEFAULT)
908     g_source_set_priority (source, priority);
909
910   g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
911
912   ctx = g_main_context_get_thread_default ();
913   id = g_source_attach (source, ctx);
914   g_source_unref (source);
915
916   if (id) {
917     bus->priv->signal_watch = source;
918   }
919
920   GST_DEBUG_OBJECT (bus, "New source %p with id %u", source, id);
921   return id;
922 }
923
924 /**
925  * gst_bus_add_watch_full: (rename-to gst_bus_add_watch)
926  * @bus: a #GstBus to create the watch for.
927  * @priority: The priority of the watch.
928  * @func: A function to call when a message is received.
929  * @user_data: user data passed to @func.
930  * @notify: the function to call when the source is removed.
931  *
932  * Adds a bus watch to the default main context with the given @priority (e.g.
933  * %G_PRIORITY_DEFAULT). It is also possible to use a non-default  main
934  * context set up using g_main_context_push_thread_default() (before
935  * one had to create a bus watch source and attach it to the desired main
936  * context 'manually').
937  *
938  * This function is used to receive asynchronous messages in the main loop.
939  * There can only be a single bus watch per bus, you must remove it before you
940  * can set a new one.
941  *
942  * The bus watch will only work if a GLib main loop is being run.
943  *
944  * When @func is called, the message belongs to the caller; if you want to
945  * keep a copy of it, call gst_message_ref() before leaving @func.
946  *
947  * The watch can be removed using gst_bus_remove_watch() or by returning %FALSE
948  * from @func. If the watch was added to the default main context it is also
949  * possible to remove the watch using g_source_remove().
950  *
951  * MT safe.
952  *
953  * Returns: The event source id or 0 if @bus already got an event source.
954  */
955 guint
956 gst_bus_add_watch_full (GstBus * bus, gint priority,
957     GstBusFunc func, gpointer user_data, GDestroyNotify notify)
958 {
959   guint id;
960
961   g_return_val_if_fail (GST_IS_BUS (bus), 0);
962
963   GST_OBJECT_LOCK (bus);
964   id = gst_bus_add_watch_full_unlocked (bus, priority, func, user_data, notify);
965   GST_OBJECT_UNLOCK (bus);
966
967   return id;
968 }
969
970 /**
971  * gst_bus_add_watch: (skip)
972  * @bus: a #GstBus to create the watch for
973  * @func: A function to call when a message is received.
974  * @user_data: user data passed to @func.
975  *
976  * Adds a bus watch to the default main context with the default priority
977  * (%G_PRIORITY_DEFAULT). It is also possible to use a non-default main
978  * context set up using g_main_context_push_thread_default() (before
979  * one had to create a bus watch source and attach it to the desired main
980  * context 'manually').
981  *
982  * This function is used to receive asynchronous messages in the main loop.
983  * There can only be a single bus watch per bus, you must remove it before you
984  * can set a new one.
985  *
986  * The bus watch will only work if a GLib main loop is being run.
987  *
988  * The watch can be removed using gst_bus_remove_watch() or by returning %FALSE
989  * from @func. If the watch was added to the default main context it is also
990  * possible to remove the watch using g_source_remove().
991  *
992  * Returns: The event source id or 0 if @bus already got an event source.
993  *
994  * MT safe.
995  */
996 guint
997 gst_bus_add_watch (GstBus * bus, GstBusFunc func, gpointer user_data)
998 {
999   return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, func,
1000       user_data, NULL);
1001 }
1002
1003 /**
1004  * gst_bus_remove_watch:
1005  * @bus: a #GstBus to remove the watch from.
1006  *
1007  * Removes an installed bus watch from @bus.
1008  *
1009  * Returns: %TRUE on success or %FALSE if @bus has no event source.
1010  *
1011  * Since: 1.6
1012  *
1013  */
1014 gboolean
1015 gst_bus_remove_watch (GstBus * bus)
1016 {
1017   GSource *watch_id;
1018
1019   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
1020
1021   GST_OBJECT_LOCK (bus);
1022
1023   if (bus->priv->signal_watch == NULL) {
1024     GST_ERROR_OBJECT (bus, "no bus watch was present");
1025     goto no_watch;
1026   }
1027
1028   watch_id = bus->priv->signal_watch;
1029
1030   GST_OBJECT_UNLOCK (bus);
1031
1032   g_source_destroy (watch_id);
1033
1034   return TRUE;
1035
1036 no_watch:
1037   GST_OBJECT_UNLOCK (bus);
1038
1039   return FALSE;
1040 }
1041
1042 typedef struct
1043 {
1044   GMainLoop *loop;
1045   guint timeout_id;
1046   gboolean source_running;
1047   GstMessageType events;
1048   GstMessage *message;
1049 } GstBusPollData;
1050
1051 static void
1052 poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
1053 {
1054   GstMessageType type;
1055
1056   if (!g_main_loop_is_running (poll_data->loop)) {
1057     GST_DEBUG ("mainloop %p not running", poll_data->loop);
1058     return;
1059   }
1060
1061   type = GST_MESSAGE_TYPE (message);
1062
1063   if (type & poll_data->events) {
1064     g_assert (poll_data->message == NULL);
1065     /* keep ref to message */
1066     poll_data->message = gst_message_ref (message);
1067     GST_DEBUG ("mainloop %p quit", poll_data->loop);
1068     g_main_loop_quit (poll_data->loop);
1069   } else {
1070     GST_DEBUG ("type %08x does not match %08x", type, poll_data->events);
1071   }
1072 }
1073
1074 static gboolean
1075 poll_timeout (GstBusPollData * poll_data)
1076 {
1077   GST_DEBUG ("mainloop %p quit", poll_data->loop);
1078   g_main_loop_quit (poll_data->loop);
1079
1080   /* we don't remove the GSource as this would free our poll_data,
1081    * which we still need */
1082   return TRUE;
1083 }
1084
1085 static void
1086 poll_destroy (GstBusPollData * poll_data, gpointer unused)
1087 {
1088   poll_data->source_running = FALSE;
1089   if (!poll_data->timeout_id) {
1090     g_main_loop_unref (poll_data->loop);
1091     g_slice_free (GstBusPollData, poll_data);
1092   }
1093 }
1094
1095 static void
1096 poll_destroy_timeout (GstBusPollData * poll_data)
1097 {
1098   poll_data->timeout_id = 0;
1099   if (!poll_data->source_running) {
1100     g_main_loop_unref (poll_data->loop);
1101     g_slice_free (GstBusPollData, poll_data);
1102   }
1103 }
1104
1105 /**
1106  * gst_bus_poll:
1107  * @bus: a #GstBus
1108  * @events: a mask of #GstMessageType, representing the set of message types to
1109  * poll for (note special handling of extended message types below)
1110  * @timeout: the poll timeout, as a #GstClockTime, or #GST_CLOCK_TIME_NONE to poll
1111  * indefinitely.
1112  *
1113  * Poll the bus for messages. Will block while waiting for messages to come.
1114  * You can specify a maximum time to poll with the @timeout parameter. If
1115  * @timeout is negative, this function will block indefinitely.
1116  *
1117  * All messages not in @events will be popped off the bus and will be ignored.
1118  * It is not possible to use message enums beyond #GST_MESSAGE_EXTENDED in the
1119  * @events mask
1120  *
1121  * Because poll is implemented using the "message" signal enabled by
1122  * gst_bus_add_signal_watch(), calling gst_bus_poll() will cause the "message"
1123  * signal to be emitted for every message that poll sees. Thus a "message"
1124  * signal handler will see the same messages that this function sees -- neither
1125  * will steal messages from the other.
1126  *
1127  * This function will run a main loop from the default main context when
1128  * polling.
1129  *
1130  * You should never use this function, since it is pure evil. This is
1131  * especially true for GUI applications based on Gtk+ or Qt, but also for any
1132  * other non-trivial application that uses the GLib main loop. As this function
1133  * runs a GLib main loop, any callback attached to the default GLib main
1134  * context may be invoked. This could be timeouts, GUI events, I/O events etc.;
1135  * even if gst_bus_poll() is called with a 0 timeout. Any of these callbacks
1136  * may do things you do not expect, e.g. destroy the main application window or
1137  * some other resource; change other application state; display a dialog and
1138  * run another main loop until the user clicks it away. In short, using this
1139  * function may add a lot of complexity to your code through unexpected
1140  * re-entrancy and unexpected changes to your application's state.
1141  *
1142  * For 0 timeouts use gst_bus_pop_filtered() instead of this function; for
1143  * other short timeouts use gst_bus_timed_pop_filtered(); everything else is
1144  * better handled by setting up an asynchronous bus watch and doing things
1145  * from there.
1146  *
1147  * Returns: (transfer full) (nullable): the message that was received,
1148  *     or %NULL if the poll timed out. The message is taken from the
1149  *     bus and needs to be unreffed with gst_message_unref() after
1150  *     usage.
1151  */
1152 GstMessage *
1153 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTime timeout)
1154 {
1155   GstBusPollData *poll_data;
1156   GstMessage *ret;
1157   gulong id;
1158
1159   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
1160
1161   poll_data = g_slice_new (GstBusPollData);
1162   poll_data->source_running = TRUE;
1163   poll_data->loop = g_main_loop_new (NULL, FALSE);
1164   poll_data->events = events;
1165   poll_data->message = NULL;
1166
1167   if (timeout != GST_CLOCK_TIME_NONE)
1168     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
1169         timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
1170         (GDestroyNotify) poll_destroy_timeout);
1171   else
1172     poll_data->timeout_id = 0;
1173
1174   id = g_signal_connect_data (bus, "message", G_CALLBACK (poll_func), poll_data,
1175       (GClosureNotify) poll_destroy, 0);
1176
1177   /* these can be nested, so it's ok */
1178   gst_bus_add_signal_watch (bus);
1179
1180   GST_DEBUG ("running mainloop %p", poll_data->loop);
1181   g_main_loop_run (poll_data->loop);
1182   GST_DEBUG ("mainloop stopped %p", poll_data->loop);
1183
1184   gst_bus_remove_signal_watch (bus);
1185
1186   /* holds a ref */
1187   ret = poll_data->message;
1188
1189   if (poll_data->timeout_id)
1190     g_source_remove (poll_data->timeout_id);
1191
1192   /* poll_data will be freed now */
1193   g_signal_handler_disconnect (bus, id);
1194
1195   GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
1196
1197   return ret;
1198 }
1199
1200 /**
1201  * gst_bus_async_signal_func:
1202  * @bus: a #GstBus
1203  * @message: the #GstMessage received
1204  * @data: user data
1205  *
1206  * A helper #GstBusFunc that can be used to convert all asynchronous messages
1207  * into signals.
1208  *
1209  * Returns: %TRUE
1210  */
1211 gboolean
1212 gst_bus_async_signal_func (GstBus * bus, GstMessage * message, gpointer data)
1213 {
1214   GQuark detail = 0;
1215
1216   g_return_val_if_fail (GST_IS_BUS (bus), TRUE);
1217   g_return_val_if_fail (message != NULL, TRUE);
1218
1219   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
1220
1221   g_signal_emit (bus, gst_bus_signals[ASYNC_MESSAGE], detail, message);
1222
1223   /* we never remove this source based on signal emission return values */
1224   return TRUE;
1225 }
1226
1227 /**
1228  * gst_bus_sync_signal_handler:
1229  * @bus: a #GstBus
1230  * @message: the #GstMessage received
1231  * @data: user data
1232  *
1233  * A helper GstBusSyncHandler that can be used to convert all synchronous
1234  * messages into signals.
1235  *
1236  * Returns: GST_BUS_PASS
1237  */
1238 GstBusSyncReply
1239 gst_bus_sync_signal_handler (GstBus * bus, GstMessage * message, gpointer data)
1240 {
1241   GQuark detail = 0;
1242
1243   g_return_val_if_fail (GST_IS_BUS (bus), GST_BUS_DROP);
1244   g_return_val_if_fail (message != NULL, GST_BUS_DROP);
1245
1246   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
1247
1248   g_signal_emit (bus, gst_bus_signals[SYNC_MESSAGE], detail, message);
1249
1250   return GST_BUS_PASS;
1251 }
1252
1253 /**
1254  * gst_bus_enable_sync_message_emission:
1255  * @bus: a #GstBus on which you want to receive the "sync-message" signal
1256  *
1257  * Instructs GStreamer to emit the "sync-message" signal after running the bus's
1258  * sync handler. This function is here so that code can ensure that they can
1259  * synchronously receive messages without having to affect what the bin's sync
1260  * handler is.
1261  *
1262  * This function may be called multiple times. To clean up, the caller is
1263  * responsible for calling gst_bus_disable_sync_message_emission() as many times
1264  * as this function is called.
1265  *
1266  * While this function looks similar to gst_bus_add_signal_watch(), it is not
1267  * exactly the same -- this function enables <emphasis>synchronous</emphasis> emission of
1268  * signals when messages arrive; gst_bus_add_signal_watch() adds an idle callback
1269  * to pop messages off the bus <emphasis>asynchronously</emphasis>. The sync-message signal
1270  * comes from the thread of whatever object posted the message; the "message"
1271  * signal is marshalled to the main thread via the main loop.
1272  *
1273  * MT safe.
1274  */
1275 void
1276 gst_bus_enable_sync_message_emission (GstBus * bus)
1277 {
1278   g_return_if_fail (GST_IS_BUS (bus));
1279
1280   GST_OBJECT_LOCK (bus);
1281   bus->priv->num_sync_message_emitters++;
1282   GST_OBJECT_UNLOCK (bus);
1283 }
1284
1285 /**
1286  * gst_bus_disable_sync_message_emission:
1287  * @bus: a #GstBus on which you previously called
1288  * gst_bus_enable_sync_message_emission()
1289  *
1290  * Instructs GStreamer to stop emitting the "sync-message" signal for this bus.
1291  * See gst_bus_enable_sync_message_emission() for more information.
1292  *
1293  * In the event that multiple pieces of code have called
1294  * gst_bus_enable_sync_message_emission(), the sync-message emissions will only
1295  * be stopped after all calls to gst_bus_enable_sync_message_emission() were
1296  * "cancelled" by calling this function. In this way the semantics are exactly
1297  * the same as gst_object_ref() that which calls enable should also call
1298  * disable.
1299  *
1300  * MT safe.
1301  */
1302 void
1303 gst_bus_disable_sync_message_emission (GstBus * bus)
1304 {
1305   g_return_if_fail (GST_IS_BUS (bus));
1306   g_return_if_fail (bus->priv->num_sync_message_emitters > 0);
1307
1308   GST_OBJECT_LOCK (bus);
1309   bus->priv->num_sync_message_emitters--;
1310   GST_OBJECT_UNLOCK (bus);
1311 }
1312
1313 /**
1314  * gst_bus_add_signal_watch_full:
1315  * @bus: a #GstBus on which you want to receive the "message" signal
1316  * @priority: The priority of the watch.
1317  *
1318  * Adds a bus signal watch to the default main context with the given @priority
1319  * (e.g. %G_PRIORITY_DEFAULT). It is also possible to use a non-default main
1320  * context set up using g_main_context_push_thread_default()
1321  * (before one had to create a bus watch source and attach it to the desired
1322  * main context 'manually').
1323  *
1324  * After calling this statement, the bus will emit the "message" signal for each
1325  * message posted on the bus when the main loop is running.
1326  *
1327  * This function may be called multiple times. To clean up, the caller is
1328  * responsible for calling gst_bus_remove_signal_watch() as many times as this
1329  * function is called.
1330  *
1331  * There can only be a single bus watch per bus, you must remove any signal
1332  * watch before you can set another type of watch.
1333  *
1334  * MT safe.
1335  */
1336 void
1337 gst_bus_add_signal_watch_full (GstBus * bus, gint priority)
1338 {
1339   g_return_if_fail (GST_IS_BUS (bus));
1340
1341   /* I know the callees don't take this lock, so go ahead and abuse it */
1342   GST_OBJECT_LOCK (bus);
1343
1344   if (bus->priv->num_signal_watchers > 0)
1345     goto done;
1346
1347   /* this should not fail because the counter above takes care of it */
1348   g_assert (!bus->priv->signal_watch);
1349
1350   gst_bus_add_watch_full_unlocked (bus, priority, gst_bus_async_signal_func,
1351       NULL, NULL);
1352
1353   if (G_UNLIKELY (!bus->priv->signal_watch))
1354     goto add_failed;
1355
1356 done:
1357
1358   bus->priv->num_signal_watchers++;
1359
1360   GST_OBJECT_UNLOCK (bus);
1361   return;
1362
1363   /* ERRORS */
1364 add_failed:
1365   {
1366     g_critical ("Could not add signal watch to bus %s", GST_OBJECT_NAME (bus));
1367     GST_OBJECT_UNLOCK (bus);
1368     return;
1369   }
1370 }
1371
1372 /**
1373  * gst_bus_add_signal_watch:
1374  * @bus: a #GstBus on which you want to receive the "message" signal
1375  *
1376  * Adds a bus signal watch to the default main context with the default priority
1377  * (%G_PRIORITY_DEFAULT). It is also possible to use a non-default
1378  * main context set up using g_main_context_push_thread_default() (before
1379  * one had to create a bus watch source and attach it to the desired main
1380  * context 'manually').
1381  *
1382  * After calling this statement, the bus will emit the "message" signal for each
1383  * message posted on the bus.
1384  *
1385  * This function may be called multiple times. To clean up, the caller is
1386  * responsible for calling gst_bus_remove_signal_watch() as many times as this
1387  * function is called.
1388  *
1389  * MT safe.
1390  */
1391 void
1392 gst_bus_add_signal_watch (GstBus * bus)
1393 {
1394   gst_bus_add_signal_watch_full (bus, G_PRIORITY_DEFAULT);
1395 }
1396
1397 /**
1398  * gst_bus_remove_signal_watch:
1399  * @bus: a #GstBus you previously added a signal watch to
1400  *
1401  * Removes a signal watch previously added with gst_bus_add_signal_watch().
1402  *
1403  * MT safe.
1404  */
1405 void
1406 gst_bus_remove_signal_watch (GstBus * bus)
1407 {
1408   GSource *source = NULL;
1409
1410   g_return_if_fail (GST_IS_BUS (bus));
1411
1412   /* I know the callees don't take this lock, so go ahead and abuse it */
1413   GST_OBJECT_LOCK (bus);
1414
1415   if (bus->priv->num_signal_watchers == 0)
1416     goto error;
1417
1418   bus->priv->num_signal_watchers--;
1419
1420   if (bus->priv->num_signal_watchers > 0)
1421     goto done;
1422
1423   GST_DEBUG_OBJECT (bus, "removing signal watch %u",
1424       g_source_get_id (bus->priv->signal_watch));
1425
1426   source = bus->priv->signal_watch;
1427
1428 done:
1429   GST_OBJECT_UNLOCK (bus);
1430
1431   if (source)
1432     g_source_destroy (source);
1433
1434   return;
1435
1436   /* ERRORS */
1437 error:
1438   {
1439     g_critical ("Bus %s has no signal watches attached", GST_OBJECT_NAME (bus));
1440     GST_OBJECT_UNLOCK (bus);
1441     return;
1442   }
1443 }