b5da3ca58a050a8a5308369218f5ba60a51b78f6
[platform/upstream/gstreamer.git] / gst / gstbus.c
1 /* GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbus.c: GstBus subsystem
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22
23 #include <errno.h>
24 #include <unistd.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27
28 #include "gst_private.h"
29 #include "gstinfo.h"
30
31 #include "gstbus.h"
32
33 enum
34 {
35   ARG_0,
36 };
37
38 static void gst_bus_class_init (GstBusClass * klass);
39 static void gst_bus_init (GstBus * bus);
40 static void gst_bus_dispose (GObject * object);
41
42 static void gst_bus_set_property (GObject * object, guint prop_id,
43     const GValue * value, GParamSpec * pspec);
44 static void gst_bus_get_property (GObject * object, guint prop_id,
45     GValue * value, GParamSpec * pspec);
46
47 static GstObjectClass *parent_class = NULL;
48
49 /* static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; */
50
51 GType
52 gst_bus_get_type (void)
53 {
54   static GType bus_type = 0;
55
56   if (!bus_type) {
57     static const GTypeInfo bus_info = {
58       sizeof (GstBusClass),
59       NULL,
60       NULL,
61       (GClassInitFunc) gst_bus_class_init,
62       NULL,
63       NULL,
64       sizeof (GstBus),
65       0,
66       (GInstanceInitFunc) gst_bus_init,
67       NULL
68     };
69
70     bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
71   }
72   return bus_type;
73 }
74
75 static void
76 gst_bus_class_init (GstBusClass * klass)
77 {
78   GObjectClass *gobject_class;
79   GstObjectClass *gstobject_class;
80
81   gobject_class = (GObjectClass *) klass;
82   gstobject_class = (GstObjectClass *) klass;
83
84   parent_class = g_type_class_ref (GST_TYPE_OBJECT);
85
86   if (!g_thread_supported ())
87     g_thread_init (NULL);
88
89   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
90   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
91   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
92 }
93
94 static void
95 gst_bus_init (GstBus * bus)
96 {
97   bus->queue = g_queue_new ();
98   bus->queue_lock = g_mutex_new ();
99
100   GST_DEBUG_OBJECT (bus, "created");
101
102   return;
103 }
104
105 static void
106 gst_bus_dispose (GObject * object)
107 {
108   GstBus *bus;
109
110   bus = GST_BUS (object);
111
112   if (bus->queue) {
113     g_mutex_lock (bus->queue_lock);
114     g_queue_free (bus->queue);
115     bus->queue = NULL;
116     g_mutex_unlock (bus->queue_lock);
117     g_mutex_free (bus->queue_lock);
118     bus->queue_lock = NULL;
119   }
120
121   G_OBJECT_CLASS (parent_class)->dispose (object);
122 }
123
124 static void
125 gst_bus_set_property (GObject * object, guint prop_id,
126     const GValue * value, GParamSpec * pspec)
127 {
128   GstBus *bus;
129
130   bus = GST_BUS (object);
131
132   switch (prop_id) {
133     default:
134       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
135       break;
136   }
137 }
138
139 static void
140 gst_bus_get_property (GObject * object, guint prop_id,
141     GValue * value, GParamSpec * pspec)
142 {
143   GstBus *bus;
144
145   bus = GST_BUS (object);
146
147   switch (prop_id) {
148     default:
149       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
150       break;
151   }
152 }
153
154 GstBus *
155 gst_bus_new (void)
156 {
157   GstBus *result;
158
159   result = g_object_new (gst_bus_get_type (), NULL);
160
161   return result;
162 }
163
164 /**
165  * gst_bus_post:
166  * @bus: a #GstBus to post on
167  * @message: The #GstMessage to post
168  *
169  * Post a message on the given bus. Ownership of the message
170  * is taken by the bus.
171  *
172  * Returns: TRUE if the message could be posted.
173  *
174  * MT safe.
175  */
176 gboolean
177 gst_bus_post (GstBus * bus, GstMessage * message)
178 {
179   GstBusSyncReply reply = GST_BUS_PASS;
180   GstBusSyncHandler handler;
181   gpointer handler_data;
182
183   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
184   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
185
186   GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %d",
187       message, GST_MESSAGE_TYPE (message));
188
189   GST_LOCK (bus);
190   if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING)) {
191     GST_DEBUG_OBJECT (bus, "bus is flushing");
192     gst_message_unref (message);
193     GST_UNLOCK (bus);
194     return FALSE;
195   }
196
197   handler = bus->sync_handler;
198   handler_data = bus->sync_handler_data;
199
200   GST_UNLOCK (bus);
201
202   /* first call the sync handler if it is installed */
203   if (handler) {
204     reply = handler (bus, message, handler_data);
205   }
206
207   /* now see what we should do with the message */
208   switch (reply) {
209     case GST_BUS_DROP:
210       /* drop the message */
211       GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message);
212       break;
213     case GST_BUS_PASS:
214       /* pass the message to the async queue, refcount passed in the queue */
215       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
216       g_mutex_lock (bus->queue_lock);
217       g_queue_push_tail (bus->queue, message);
218       g_mutex_unlock (bus->queue_lock);
219       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
220
221       /* FIXME cannot assume the source is only in the default context */
222       g_main_context_wakeup (NULL);
223
224       break;
225     case GST_BUS_ASYNC:
226     {
227       /* async delivery, we need a mutex and a cond to block
228        * on */
229       GMutex *lock = g_mutex_new ();
230       GCond *cond = g_cond_new ();
231
232       GST_MESSAGE_COND (message) = cond;
233       GST_MESSAGE_GET_LOCK (message) = lock;
234
235       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
236
237       /* now we lock the message mutex, send the message to the async
238        * queue. When the message is handled by the app and destroyed,
239        * the cond will be signalled and we can continue */
240       g_mutex_lock (lock);
241       g_mutex_lock (bus->queue_lock);
242       g_queue_push_tail (bus->queue, message);
243       g_mutex_unlock (bus->queue_lock);
244
245       /* FIXME cannot assume the source is only in the default context */
246       g_main_context_wakeup (NULL);
247
248       /* now block till the message is freed */
249       g_cond_wait (cond, lock);
250       g_mutex_unlock (lock);
251
252       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
253
254       g_mutex_free (lock);
255       g_cond_free (cond);
256       break;
257     }
258   }
259
260   return TRUE;
261 }
262
263 /**
264  * gst_bus_have_pending:
265  * @bus: a #GstBus to check
266  *
267  * Check if there are pending messages on the bus that should be 
268  * handled.
269  *
270  * Returns: TRUE if there are messages on the bus to be handled.
271  *
272  * MT safe.
273  */
274 gboolean
275 gst_bus_have_pending (GstBus * bus)
276 {
277   gint length;
278
279   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
280
281   g_mutex_lock (bus->queue_lock);
282   length = g_queue_get_length (bus->queue);
283   g_mutex_unlock (bus->queue_lock);
284
285   return (length > 0);
286 }
287
288 /**
289  * gst_bus_set_flushing:
290  * @bus: a #GstBus
291  * @flushing: whether or not to flush the bus
292  *
293  * If @flushing, flush out and unref any messages queued in the bus. Releases
294  * references to the message origin objects. Will flush future messages until
295  * gst_bus_set_flushing() sets @flushing to #FALSE.
296  *
297  * MT safe.
298  */
299 void
300 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
301 {
302   GstMessage *message;
303
304   GST_LOCK (bus);
305
306   if (flushing) {
307     GST_FLAG_SET (bus, GST_BUS_FLUSHING);
308
309     GST_DEBUG_OBJECT (bus, "set bus flushing");
310
311     while ((message = gst_bus_pop (bus)))
312       gst_message_unref (message);
313   } else {
314     GST_DEBUG_OBJECT (bus, "unset bus flushing");
315     GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
316   }
317
318   GST_UNLOCK (bus);
319 }
320
321 /**
322  * gst_bus_pop:
323  * @bus: a #GstBus to pop
324  *
325  * Get a message from the bus.
326  *
327  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
328  *
329  * MT safe.
330  */
331 GstMessage *
332 gst_bus_pop (GstBus * bus)
333 {
334   GstMessage *message;
335
336   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
337
338   g_mutex_lock (bus->queue_lock);
339   message = g_queue_pop_head (bus->queue);
340   g_mutex_unlock (bus->queue_lock);
341
342   GST_DEBUG_OBJECT (bus, "pop on bus, got message %p", message);
343
344   return message;
345 }
346
347 /**
348  * gst_bus_peek:
349  * @bus: a #GstBus
350  *
351  * Peek the message on the top of the bus' queue. The message will remain 
352  * on the bus' message queue. A reference is returned, and needs to be freed
353  * by the caller.
354  *
355  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
356  *
357  * MT safe.
358  */
359 GstMessage *
360 gst_bus_peek (GstBus * bus)
361 {
362   GstMessage *message;
363
364   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
365
366   g_mutex_lock (bus->queue_lock);
367   message = g_queue_peek_head (bus->queue);
368   if (message)
369     gst_message_ref (message);
370   g_mutex_unlock (bus->queue_lock);
371
372   GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
373
374   return message;
375 }
376
377 /**
378  * gst_bus_set_sync_handler:
379  * @bus: a #GstBus to install the handler on
380  * @func: The handler function to install
381  * @data: User data that will be sent to the handler function.
382  *
383  * Install a synchronous handler on the bus. The function will be called
384  * every time a new message is posted on the bus. Note that the function
385  * will be called in the same thread context as the posting object.
386  */
387 void
388 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
389 {
390   g_return_if_fail (GST_IS_BUS (bus));
391
392   GST_LOCK (bus);
393   bus->sync_handler = func;
394   bus->sync_handler_data = data;
395   GST_UNLOCK (bus);
396 }
397
398 /* GSource for the bus
399  */
400 typedef struct
401 {
402   GSource source;
403   GstBus *bus;
404 } GstBusSource;
405
406 static gboolean
407 gst_bus_source_prepare (GSource * source, gint * timeout)
408 {
409   *timeout = -1;
410   return gst_bus_have_pending (((GstBusSource *) source)->bus);
411 }
412
413 static gboolean
414 gst_bus_source_check (GSource * source)
415 {
416   return gst_bus_have_pending (((GstBusSource *) source)->bus);
417 }
418
419 static gboolean
420 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
421     gpointer user_data)
422 {
423   GstBusHandler handler = (GstBusHandler) callback;
424   GstBusSource *bsource = (GstBusSource *) source;
425   GstMessage *message;
426   gboolean needs_pop = TRUE;
427   GstBus *bus;
428
429   g_return_val_if_fail (bsource != NULL, FALSE);
430
431   bus = bsource->bus;
432
433   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
434
435   message = gst_bus_peek (bus);
436
437   GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message);
438
439   g_return_val_if_fail (message != NULL, TRUE);
440
441   if (!handler)
442     goto no_handler;
443
444   GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
445
446   needs_pop = handler (bus, message, user_data);
447   gst_message_unref (message);
448
449   GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop);
450   if (needs_pop) {
451     message = gst_bus_pop (bus);
452     if (message) {
453       gst_message_unref (message);
454     } else {
455       /* after executing the handler, the app could have disposed
456        * the pipeline and set the bus to flushing. It is possible
457        * then that there are no more messages on the bus. this is
458        * not a problem. */
459       GST_DEBUG ("handler requested pop but no message on the bus");
460     }
461   }
462   return TRUE;
463
464 no_handler:
465   {
466     g_warning ("GstBus watch dispatched without callback\n"
467         "You must call g_source_connect().");
468     gst_message_unref (message);
469     return FALSE;
470   }
471 }
472
473 static void
474 gst_bus_source_finalize (GSource * source)
475 {
476   GstBusSource *bsource = (GstBusSource *) source;
477
478   gst_object_unref (bsource->bus);
479   bsource->bus = NULL;
480 }
481
482 static GSourceFuncs gst_bus_source_funcs = {
483   gst_bus_source_prepare,
484   gst_bus_source_check,
485   gst_bus_source_dispatch,
486   gst_bus_source_finalize
487 };
488
489 /**
490  * gst_bus_create_watch:
491  * @bus: a #GstBus to create the watch for
492  *
493  * Create watch for this bus. 
494  *
495  * Returns: A #GSource that can be added to a mainloop.
496  */
497 GSource *
498 gst_bus_create_watch (GstBus * bus)
499 {
500   GstBusSource *source;
501
502   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
503
504   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
505       sizeof (GstBusSource));
506   gst_object_ref (bus);
507   source->bus = bus;
508
509   return (GSource *) source;
510 }
511
512 /**
513  * gst_bus_add_watch_full:
514  * @bus: a #GstBus to create the watch for.
515  * @priority: The priority of the watch.
516  * @handler: A function to call when a message is received.
517  * @user_data: user data passed to @handler.
518  * @notify: the function to call when the source is removed.
519  *
520  * Adds the bus to the mainloop with the given priority. If the handler returns
521  * TRUE, the message will then be popped off the queue. When the handler is
522  * called, the message belongs to the caller; if you want to keep a copy of it,
523  * call gst_message_ref before leaving the handler.
524  *
525  * Returns: The event source id.
526  *
527  * MT safe.
528  */
529 guint
530 gst_bus_add_watch_full (GstBus * bus, gint priority,
531     GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
532 {
533   guint id;
534   GSource *source;
535
536   g_return_val_if_fail (GST_IS_BUS (bus), 0);
537
538   source = gst_bus_create_watch (bus);
539
540   if (priority != G_PRIORITY_DEFAULT)
541     g_source_set_priority (source, priority);
542
543   g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
544
545   id = g_source_attach (source, NULL);
546   g_source_unref (source);
547
548   GST_DEBUG_OBJECT (bus, "New source %p", source);
549   return id;
550 }
551
552 /**
553  * gst_bus_add_watch:
554  * @bus: a #GstBus to create the watch for
555  * @handler: A function to call when a message is received.
556  * @user_data: user data passed to @handler.
557  *
558  * Adds the bus to the mainloop with the default priority.
559  *
560  * Returns: The event source id.
561  *
562  * MT safe.
563  */
564 guint
565 gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data)
566 {
567   return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data,
568       NULL);
569 }
570
571 typedef struct
572 {
573   GMainLoop *loop;
574   guint timeout_id;
575   gboolean source_running;
576   GstMessageType events;
577   GstMessageType revent;
578 } GstBusPollData;
579
580 static gboolean
581 poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
582 {
583   if (!g_main_loop_is_running (poll_data->loop))
584     return FALSE;
585
586   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
587     poll_data->revent = GST_MESSAGE_TYPE (message);
588     g_main_loop_quit (poll_data->loop);
589
590     /* keep the message on the queue */
591     return FALSE;
592   } else {
593     /* pop and unref the message */
594     return TRUE;
595   }
596 }
597
598 static gboolean
599 poll_timeout (GstBusPollData * poll_data)
600 {
601   g_main_loop_quit (poll_data->loop);
602
603   /* returning FALSE will remove the source id */
604   return FALSE;
605 }
606
607 static void
608 poll_destroy (GstBusPollData * poll_data)
609 {
610   poll_data->source_running = FALSE;
611   if (!poll_data->timeout_id) {
612     g_main_loop_unref (poll_data->loop);
613     g_free (poll_data);
614   }
615 }
616
617 static void
618 poll_destroy_timeout (GstBusPollData * poll_data)
619 {
620   poll_data->timeout_id = 0;
621   if (!poll_data->source_running) {
622     g_main_loop_unref (poll_data->loop);
623     g_free (poll_data);
624   }
625 }
626
627 /**
628  * gst_bus_poll:
629  * @bus: a #GstBus
630  * @events: a mask of #GstMessageType, representing the set of message types to
631  * poll for.
632  * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
633  *
634  * Poll the bus for events. Will block while waiting for events to come. You can
635  * specify a maximum time to poll with the @timeout parameter. If @timeout is
636  * negative, this function will block indefinitely.
637  *
638  * This function will enter the default mainloop while polling.
639  *
640  * Returns: The type of the message that was received, or GST_MESSAGE_UNKNOWN if
641  * the poll timed out. The message will remain in the bus queue; you will need
642  * to gst_bus_pop() it off before entering gst_bus_poll() again.
643  */
644 GstMessageType
645 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
646 {
647   GstBusPollData *poll_data;
648   GstMessageType ret;
649   guint id;
650
651   poll_data = g_new0 (GstBusPollData, 1);
652   g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN);
653
654   poll_data->source_running = TRUE;
655   poll_data->loop = g_main_loop_new (NULL, FALSE);
656   poll_data->events = events;
657   poll_data->revent = GST_MESSAGE_UNKNOWN;
658
659   if (timeout >= 0)
660     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
661         timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
662         (GDestroyNotify) poll_destroy_timeout);
663   else
664     poll_data->timeout_id = 0;
665
666   id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE,
667       (GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy);
668   g_main_loop_run (poll_data->loop);
669   ret = poll_data->revent;
670
671   if (poll_data->timeout_id)
672     g_source_remove (poll_data->timeout_id);
673
674   /* poll_data may get destroyed at any time now */
675   g_source_remove (id);
676
677   GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret);
678
679   return ret;
680 }