Test that removing probes from within the probe functions works.
[platform/upstream/gstreamer.git] / gst / gstbus.c
1 /* GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbus.c: GstBus subsystem
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21 /**
22  * SECTION:gstbus
23  * @short_description: Asynchronous message bus subsystem
24  * @see_also: #GstMessage, #GstElement
25  *
26  * The #GstBus is an object responsible for delivering #GstMessages in
27  * a first-in first-out way from the streaming threads to the application.
28  * 
29  * Since the application typically only wants to deal with delivery of these
30  * messages from one thread, the GstBus will marshall the messages between
31  * different threads. This is important since the actual streaming of media
32  * is done in another thread than the application.
33  * 
34  * The GstBus provides support for #GSource based notifications. This makes it
35  * possible to handle the delivery in the glib mainloop.
36  * 
37  * A message is posted on the bus with the gst_bus_post() method. With the
38  * gst_bus_peek() and _pop() methods one can look at or retrieve a previously
39  * posted message.
40  * 
41  * The bus can be polled with the gst_bus_poll() method. This methods blocks
42  * up to the specified timeout value until one of the specified messages types
43  * is posted on the bus. The application can then _pop() the messages from the
44  * bus to handle them.
45  * Alternatively the application can register an asynchronous bus function using
46  * gst_bus_add_watch_full() orgst_bus_add_watch(). This function will receive
47  * messages a short while after they have been posted.
48  * 
49  * It is also possible to get messages from the bus without any thread 
50  * marshalling with the gst_bus_set_sync_handler() method. This makes it
51  * possible to react to a message in the same thread that posted the
52  * message on the bus. This should only be used if the application is able
53  * to deal with messages from different threads.
54  *
55  * Every #GstBin has one bus.
56  */
57
58 #include <errno.h>
59 #include <unistd.h>
60 #include <sys/types.h>
61 #include <sys/socket.h>
62
63 #include "gst_private.h"
64 #include "gstinfo.h"
65
66 #include "gstbus.h"
67
68 enum
69 {
70   ARG_0,
71 };
72
73 static void gst_bus_class_init (GstBusClass * klass);
74 static void gst_bus_init (GstBus * bus);
75 static void gst_bus_dispose (GObject * object);
76
77 static void gst_bus_set_property (GObject * object, guint prop_id,
78     const GValue * value, GParamSpec * pspec);
79 static void gst_bus_get_property (GObject * object, guint prop_id,
80     GValue * value, GParamSpec * pspec);
81
82 static GstObjectClass *parent_class = NULL;
83
84 /* static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; */
85
86 GType
87 gst_bus_get_type (void)
88 {
89   static GType bus_type = 0;
90
91   if (!bus_type) {
92     static const GTypeInfo bus_info = {
93       sizeof (GstBusClass),
94       NULL,
95       NULL,
96       (GClassInitFunc) gst_bus_class_init,
97       NULL,
98       NULL,
99       sizeof (GstBus),
100       0,
101       (GInstanceInitFunc) gst_bus_init,
102       NULL
103     };
104
105     bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
106   }
107   return bus_type;
108 }
109
110 static void
111 gst_bus_class_init (GstBusClass * klass)
112 {
113   GObjectClass *gobject_class;
114   GstObjectClass *gstobject_class;
115
116   gobject_class = (GObjectClass *) klass;
117   gstobject_class = (GstObjectClass *) klass;
118
119   parent_class = g_type_class_ref (GST_TYPE_OBJECT);
120
121   if (!g_thread_supported ())
122     g_thread_init (NULL);
123
124   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
125   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
126   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
127 }
128
129 static void
130 gst_bus_init (GstBus * bus)
131 {
132   bus->queue = g_queue_new ();
133   bus->queue_lock = g_mutex_new ();
134
135   GST_DEBUG_OBJECT (bus, "created");
136
137   return;
138 }
139
140 static void
141 gst_bus_dispose (GObject * object)
142 {
143   GstBus *bus;
144
145   bus = GST_BUS (object);
146
147   if (bus->queue) {
148     GstMessage *message;
149
150     g_mutex_lock (bus->queue_lock);
151     do {
152       message = g_queue_pop_head (bus->queue);
153       if (message)
154         gst_message_unref (message);
155     } while (message != NULL);
156     g_queue_free (bus->queue);
157     bus->queue = NULL;
158     g_mutex_unlock (bus->queue_lock);
159     g_mutex_free (bus->queue_lock);
160     bus->queue_lock = NULL;
161   }
162
163   G_OBJECT_CLASS (parent_class)->dispose (object);
164 }
165
166 static void
167 gst_bus_set_property (GObject * object, guint prop_id,
168     const GValue * value, GParamSpec * pspec)
169 {
170   GstBus *bus;
171
172   bus = GST_BUS (object);
173
174   switch (prop_id) {
175     default:
176       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
177       break;
178   }
179 }
180
181 static void
182 gst_bus_get_property (GObject * object, guint prop_id,
183     GValue * value, GParamSpec * pspec)
184 {
185   GstBus *bus;
186
187   bus = GST_BUS (object);
188
189   switch (prop_id) {
190     default:
191       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
192       break;
193   }
194 }
195
196 /**
197  * gst_bus_new:
198  *
199  * Creates a new #GstBuus instance.
200  *
201  * Returns: a new #GstBus instance
202  */
203 GstBus *
204 gst_bus_new (void)
205 {
206   GstBus *result;
207
208   result = g_object_new (gst_bus_get_type (), NULL);
209
210   return result;
211 }
212
213 /**
214  * gst_bus_post:
215  * @bus: a #GstBus to post on
216  * @message: The #GstMessage to post
217  *
218  * Post a message on the given bus. Ownership of the message
219  * is taken by the bus.
220  *
221  * Returns: TRUE if the message could be posted.
222  *
223  * MT safe.
224  */
225 gboolean
226 gst_bus_post (GstBus * bus, GstMessage * message)
227 {
228   GstBusSyncReply reply = GST_BUS_PASS;
229   GstBusSyncHandler handler;
230   gpointer handler_data;
231
232   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
233   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
234
235   GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %d",
236       message, GST_MESSAGE_TYPE (message));
237
238   GST_LOCK (bus);
239   /* check if the bus is flushing */
240   if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
241     goto is_flushing;
242
243   handler = bus->sync_handler;
244   handler_data = bus->sync_handler_data;
245   GST_UNLOCK (bus);
246
247   /* first call the sync handler if it is installed */
248   if (handler)
249     reply = handler (bus, message, handler_data);
250
251   /* now see what we should do with the message */
252   switch (reply) {
253     case GST_BUS_DROP:
254       /* drop the message */
255       GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message);
256       break;
257     case GST_BUS_PASS:
258       /* pass the message to the async queue, refcount passed in the queue */
259       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
260       g_mutex_lock (bus->queue_lock);
261       g_queue_push_tail (bus->queue, message);
262       g_mutex_unlock (bus->queue_lock);
263       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
264
265       /* FIXME cannot assume the source is only in the default context */
266       g_main_context_wakeup (NULL);
267
268       break;
269     case GST_BUS_ASYNC:
270     {
271       /* async delivery, we need a mutex and a cond to block
272        * on */
273       GMutex *lock = g_mutex_new ();
274       GCond *cond = g_cond_new ();
275
276       GST_MESSAGE_COND (message) = cond;
277       GST_MESSAGE_GET_LOCK (message) = lock;
278
279       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
280
281       /* now we lock the message mutex, send the message to the async
282        * queue. When the message is handled by the app and destroyed,
283        * the cond will be signalled and we can continue */
284       g_mutex_lock (lock);
285       g_mutex_lock (bus->queue_lock);
286       g_queue_push_tail (bus->queue, message);
287       g_mutex_unlock (bus->queue_lock);
288
289       /* FIXME cannot assume the source is only in the default context */
290       g_main_context_wakeup (NULL);
291
292       /* now block till the message is freed */
293       g_cond_wait (cond, lock);
294       g_mutex_unlock (lock);
295
296       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
297
298       g_mutex_free (lock);
299       g_cond_free (cond);
300       break;
301     }
302     default:
303       g_warning ("invalid return from bus sync handler");
304       break;
305   }
306   return TRUE;
307
308   /* ERRORS */
309 is_flushing:
310   {
311     GST_DEBUG_OBJECT (bus, "bus is flushing");
312     gst_message_unref (message);
313     GST_UNLOCK (bus);
314
315     return FALSE;
316   }
317 }
318
319 /**
320  * gst_bus_have_pending:
321  * @bus: a #GstBus to check
322  * @events: a mask of #GstMessageType, representing the set of message types to
323  * watch for.
324  *
325  * Check if there are pending messages on the bus of the given types that
326  * should be handled.
327  *
328  * Returns: TRUE if there are messages on the bus to be handled.
329  *
330  * MT safe.
331  */
332 gboolean
333 gst_bus_have_pending (GstBus * bus, GstMessageType events)
334 {
335   GstMessage *message;
336   gboolean result;
337
338   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
339
340   g_mutex_lock (bus->queue_lock);
341   /* see if there is a message on the bus that satisfies the
342    * event mask */
343   message = g_queue_peek_head (bus->queue);
344   if (message)
345     result = (GST_MESSAGE_TYPE (message) & events) != 0;
346   else
347     result = FALSE;
348   g_mutex_unlock (bus->queue_lock);
349
350   return result;
351 }
352
353 /**
354  * gst_bus_set_flushing:
355  * @bus: a #GstBus
356  * @flushing: whether or not to flush the bus
357  *
358  * If @flushing, flush out and unref any messages queued in the bus. Releases
359  * references to the message origin objects. Will flush future messages until
360  * gst_bus_set_flushing() sets @flushing to #FALSE.
361  *
362  * MT safe.
363  */
364 void
365 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
366 {
367   GstMessage *message;
368
369   GST_LOCK (bus);
370
371   if (flushing) {
372     GST_FLAG_SET (bus, GST_BUS_FLUSHING);
373
374     GST_DEBUG_OBJECT (bus, "set bus flushing");
375
376     while ((message = gst_bus_pop (bus)))
377       gst_message_unref (message);
378   } else {
379     GST_DEBUG_OBJECT (bus, "unset bus flushing");
380     GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
381   }
382
383   GST_UNLOCK (bus);
384 }
385
386 /**
387  * gst_bus_pop:
388  * @bus: a #GstBus to pop
389  *
390  * Get a message from the bus.
391  *
392  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
393  *
394  * MT safe.
395  */
396 GstMessage *
397 gst_bus_pop (GstBus * bus)
398 {
399   GstMessage *message;
400
401   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
402
403   g_mutex_lock (bus->queue_lock);
404   message = g_queue_pop_head (bus->queue);
405   g_mutex_unlock (bus->queue_lock);
406
407   GST_DEBUG_OBJECT (bus, "pop on bus, got message %p", message);
408
409   return message;
410 }
411
412 /**
413  * gst_bus_peek:
414  * @bus: a #GstBus
415  *
416  * Peek the message on the top of the bus' queue. The message will remain 
417  * on the bus' message queue. A reference is returned, and needs to be unreffed
418  * by the caller.
419  *
420  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
421  *
422  * MT safe.
423  */
424 GstMessage *
425 gst_bus_peek (GstBus * bus)
426 {
427   GstMessage *message;
428
429   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
430
431   g_mutex_lock (bus->queue_lock);
432   message = g_queue_peek_head (bus->queue);
433   if (message)
434     gst_message_ref (message);
435   g_mutex_unlock (bus->queue_lock);
436
437   GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
438
439   return message;
440 }
441
442 /**
443  * gst_bus_set_sync_handler:
444  * @bus: a #GstBus to install the handler on
445  * @func: The handler function to install
446  * @data: User data that will be sent to the handler function.
447  *
448  * Sets the synchronous handler on the bus. The function will be called
449  * every time a new message is posted on the bus. Note that the function
450  * will be called in the same thread context as the posting object. This
451  * function is usually only called by the creator of the bus. Applications
452  * should handle messages asynchronously using the gst_bus watch and poll
453  * functions.
454  */
455 void
456 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
457 {
458   g_return_if_fail (GST_IS_BUS (bus));
459
460   GST_LOCK (bus);
461
462   /* Assert if the user attempts to replace an existing sync_handler,
463    * other than to clear it */
464   g_assert (func == NULL || bus->sync_handler == NULL);
465
466   bus->sync_handler = func;
467   bus->sync_handler_data = data;
468   GST_UNLOCK (bus);
469 }
470
471 /* GSource for the bus
472  */
473 typedef struct
474 {
475   GSource source;
476   GstBus *bus;
477   GstMessageType events;
478 } GstBusSource;
479
480 static gboolean
481 gst_bus_source_prepare (GSource * source, gint * timeout)
482 {
483   GstBusSource *bsrc = (GstBusSource *) source;
484
485   *timeout = -1;
486   return gst_bus_have_pending (bsrc->bus, bsrc->events);
487 }
488
489 static gboolean
490 gst_bus_source_check (GSource * source)
491 {
492   GstBusSource *bsrc = (GstBusSource *) source;
493
494   return gst_bus_have_pending (bsrc->bus, bsrc->events);
495 }
496
497 static gboolean
498 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
499     gpointer user_data)
500 {
501   GstBusFunc handler = (GstBusFunc) callback;
502   GstBusSource *bsource = (GstBusSource *) source;
503   GstMessage *message;
504   gboolean keep;
505   GstBus *bus;
506
507   g_return_val_if_fail (bsource != NULL, FALSE);
508
509   bus = bsource->bus;
510
511   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
512
513   message = gst_bus_pop (bus);
514   g_return_val_if_fail (message != NULL, FALSE);
515
516   if (!handler)
517     goto no_handler;
518
519   GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
520
521   keep = handler (bus, message, user_data);
522   gst_message_unref (message);
523
524   GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep);
525
526   return keep;
527
528 no_handler:
529   {
530     g_warning ("GstBus watch dispatched without callback\n"
531         "You must call g_source_connect().");
532     gst_message_unref (message);
533     return FALSE;
534   }
535 }
536
537 static void
538 gst_bus_source_finalize (GSource * source)
539 {
540   GstBusSource *bsource = (GstBusSource *) source;
541
542   gst_object_unref (bsource->bus);
543   bsource->bus = NULL;
544 }
545
546 static GSourceFuncs gst_bus_source_funcs = {
547   gst_bus_source_prepare,
548   gst_bus_source_check,
549   gst_bus_source_dispatch,
550   gst_bus_source_finalize
551 };
552
553 /**
554  * gst_bus_create_watch:
555  * @bus: a #GstBus to create the watch for
556  * @events: a mask of #GstMessageType, representing the set of message types to
557  * watch for.
558  *
559  * Create watch for this bus. The source will only act on messages of the
560  * given types, messages of other types will simply remain on the bus and 
561  * this GSource will not be dispatched again before the message is popped off
562  * the bus. For this reason one typically has a low priority GSource that
563  * pops all remaining messages from the bus not handled by the other GSources.
564  *
565  * Returns: A #GSource that can be added to a mainloop.
566  */
567 GSource *
568 gst_bus_create_watch (GstBus * bus, GstMessageType events)
569 {
570   GstBusSource *source;
571
572   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
573
574   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
575       sizeof (GstBusSource));
576   gst_object_ref (bus);
577   source->bus = bus;
578   source->events = events;
579
580   return (GSource *) source;
581 }
582
583 /**
584  * gst_bus_add_watch_full:
585  * @bus: a #GstBus to create the watch for.
586  * @priority: The priority of the watch.
587  * @events: a mask of #GstMessageType, representing the set of message types to
588  * watch for.
589  * @func: A function to call when a message is received.
590  * @user_data: user data passed to @func.
591  * @notify: the function to call when the source is removed.
592  *
593  * Adds the bus to the mainloop with the given priority. If the func returns
594  * FALSE, the func will be removed. 
595  *
596  * When the func is called, the message belongs to the caller; if you want to 
597  * keep a copy of it, call gst_message_ref before leaving the func.
598  *
599  * Returns: The event source id.
600  *
601  * MT safe.
602  */
603 guint
604 gst_bus_add_watch_full (GstBus * bus, gint priority, GstMessageType events,
605     GstBusFunc func, gpointer user_data, GDestroyNotify notify)
606 {
607   guint id;
608   GSource *source;
609
610   g_return_val_if_fail (GST_IS_BUS (bus), 0);
611
612   source = gst_bus_create_watch (bus, events);
613
614   if (priority != G_PRIORITY_DEFAULT)
615     g_source_set_priority (source, priority);
616
617   g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
618
619   id = g_source_attach (source, NULL);
620   g_source_unref (source);
621
622   GST_DEBUG_OBJECT (bus, "New source %p", source);
623   return id;
624 }
625
626 /**
627  * gst_bus_add_watch:
628  * @bus: a #GstBus to create the watch for
629  * @events: a mask of #GstMessageType, representing the set of message types to
630  * watch for.
631  * @func: A function to call when a message is received.
632  * @user_data: user data passed to @func.
633  *
634  * Adds the bus to the mainloop with the default priority.
635  *
636  * Returns: The event source id.
637  *
638  * MT safe.
639  */
640 guint
641 gst_bus_add_watch (GstBus * bus, GstMessageType events, GstBusFunc func,
642     gpointer user_data)
643 {
644   return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, events, func,
645       user_data, NULL);
646 }
647
648 typedef struct
649 {
650   GMainLoop *loop;
651   guint timeout_id;
652   gboolean source_running;
653   GstMessageType events;
654   GstMessage *message;
655 } GstBusPollData;
656
657 static gboolean
658 poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
659 {
660   if (!g_main_loop_is_running (poll_data->loop)) {
661     GST_DEBUG ("mainloop %p not running", poll_data->loop);
662     return TRUE;
663   }
664
665   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
666     g_return_val_if_fail (poll_data->message == NULL, FALSE);
667     /* keep ref to message */
668     poll_data->message = gst_message_ref (message);
669     GST_DEBUG ("mainloop %p quit", poll_data->loop);
670     g_main_loop_quit (poll_data->loop);
671   } else {
672     /* don't remove the source. */
673   }
674   /* we always keep the source alive so that we don't accidentialy
675    * free the poll_data */
676   return TRUE;
677 }
678
679 static gboolean
680 poll_timeout (GstBusPollData * poll_data)
681 {
682   GST_DEBUG ("mainloop %p quit", poll_data->loop);
683   g_main_loop_quit (poll_data->loop);
684
685   /* we don't remove the GSource as this would free our poll_data,
686    * which we still need */
687   return TRUE;
688 }
689
690 static void
691 poll_destroy (GstBusPollData * poll_data)
692 {
693   poll_data->source_running = FALSE;
694   if (!poll_data->timeout_id) {
695     g_main_loop_unref (poll_data->loop);
696     g_free (poll_data);
697   }
698 }
699
700 static void
701 poll_destroy_timeout (GstBusPollData * poll_data)
702 {
703   poll_data->timeout_id = 0;
704   if (!poll_data->source_running) {
705     g_main_loop_unref (poll_data->loop);
706     g_free (poll_data);
707   }
708 }
709
710 /**
711  * gst_bus_poll:
712  * @bus: a #GstBus
713  * @events: a mask of #GstMessageType, representing the set of message types to
714  * poll for.
715  * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
716  *
717  * Poll the bus for events. Will block while waiting for events to come. You can
718  * specify a maximum time to poll with the @timeout parameter. If @timeout is
719  * negative, this function will block indefinitely.
720  *
721  * This function will enter the default mainloop while polling.
722  *
723  * Returns: The message that was received, or NULL if the poll timed out. 
724  * The message is taken from the bus and needs to be unreffed after usage.
725  */
726 GstMessage *
727 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
728 {
729   GstBusPollData *poll_data;
730   GstMessage *ret;
731   guint id;
732
733   poll_data = g_new0 (GstBusPollData, 1);
734   poll_data->source_running = TRUE;
735   poll_data->loop = g_main_loop_new (NULL, FALSE);
736   poll_data->events = events;
737   poll_data->message = NULL;
738
739   if (timeout >= 0)
740     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
741         timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
742         (GDestroyNotify) poll_destroy_timeout);
743   else
744     poll_data->timeout_id = 0;
745
746   id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, GST_MESSAGE_ANY,
747       (GstBusFunc) poll_func, poll_data, (GDestroyNotify) poll_destroy);
748
749   GST_DEBUG ("running mainloop %p", poll_data->loop);
750   g_main_loop_run (poll_data->loop);
751   GST_DEBUG ("mainloop stopped %p", poll_data->loop);
752   /* holds a ref */
753   ret = poll_data->message;
754
755   if (poll_data->timeout_id)
756     g_source_remove (poll_data->timeout_id);
757
758   /* poll_data may get destroyed at any time now */
759   g_source_remove (id);
760
761   GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
762
763   return ret;
764 }