e53ff6ad98edec56577b68d636fad0789be399fb
[platform/upstream/gstreamer.git] / gst / gstbus.c
1 /* GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbus.c: GstBus subsystem
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22
23 #include <errno.h>
24 #include <unistd.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27
28 #include "gst_private.h"
29 #include "gstinfo.h"
30
31 #include "gstbus.h"
32
33 enum
34 {
35   ARG_0,
36 };
37
38 static void gst_bus_class_init (GstBusClass * klass);
39 static void gst_bus_init (GstBus * bus);
40 static void gst_bus_dispose (GObject * object);
41
42 static void gst_bus_set_property (GObject * object, guint prop_id,
43     const GValue * value, GParamSpec * pspec);
44 static void gst_bus_get_property (GObject * object, guint prop_id,
45     GValue * value, GParamSpec * pspec);
46
47 static GstObjectClass *parent_class = NULL;
48
49 /* static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; */
50
51 GType
52 gst_bus_get_type (void)
53 {
54   static GType bus_type = 0;
55
56   if (!bus_type) {
57     static const GTypeInfo bus_info = {
58       sizeof (GstBusClass),
59       NULL,
60       NULL,
61       (GClassInitFunc) gst_bus_class_init,
62       NULL,
63       NULL,
64       sizeof (GstBus),
65       0,
66       (GInstanceInitFunc) gst_bus_init,
67       NULL
68     };
69
70     bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
71   }
72   return bus_type;
73 }
74
75 static void
76 gst_bus_class_init (GstBusClass * klass)
77 {
78   GObjectClass *gobject_class;
79   GstObjectClass *gstobject_class;
80
81   gobject_class = (GObjectClass *) klass;
82   gstobject_class = (GstObjectClass *) klass;
83
84   parent_class = g_type_class_ref (GST_TYPE_OBJECT);
85
86   if (!g_thread_supported ())
87     g_thread_init (NULL);
88
89   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
90   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
91   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
92 }
93
94 static void
95 gst_bus_init (GstBus * bus)
96 {
97   bus->queue = g_queue_new ();
98   bus->queue_lock = g_mutex_new ();
99
100   return;
101 }
102
103 static void
104 gst_bus_dispose (GObject * object)
105 {
106   GstBus *bus;
107
108   bus = GST_BUS (object);
109
110   if (bus->queue) {
111     g_mutex_lock (bus->queue_lock);
112     g_queue_free (bus->queue);
113     bus->queue = NULL;
114     g_mutex_unlock (bus->queue_lock);
115     g_mutex_free (bus->queue_lock);
116     bus->queue_lock = NULL;
117   }
118
119   G_OBJECT_CLASS (parent_class)->dispose (object);
120 }
121
122 static void
123 gst_bus_set_property (GObject * object, guint prop_id,
124     const GValue * value, GParamSpec * pspec)
125 {
126   GstBus *bus;
127
128   bus = GST_BUS (object);
129
130   switch (prop_id) {
131     default:
132       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
133       break;
134   }
135 }
136
137 static void
138 gst_bus_get_property (GObject * object, guint prop_id,
139     GValue * value, GParamSpec * pspec)
140 {
141   GstBus *bus;
142
143   bus = GST_BUS (object);
144
145   switch (prop_id) {
146     default:
147       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
148       break;
149   }
150 }
151
152 GstBus *
153 gst_bus_new (void)
154 {
155   GstBus *result;
156
157   result = g_object_new (gst_bus_get_type (), NULL);
158
159   return result;
160 }
161
162 /**
163  * gst_bus_post:
164  * @bus: a #GstBus to post on
165  * @message: The #GstMessage to post
166  *
167  * Post a message on the given bus.
168  *
169  * Returns: TRUE if the message could be posted.
170  *
171  * MT safe.
172  */
173 gboolean
174 gst_bus_post (GstBus * bus, GstMessage * message)
175 {
176   GstBusSyncReply reply = GST_BUS_PASS;
177   GstBusSyncHandler handler;
178   gpointer handler_data;
179
180   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
181   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
182
183   GST_DEBUG_OBJECT (bus, "posting message on bus, type %d",
184       GST_MESSAGE_TYPE (message));
185
186   GST_LOCK (bus);
187   if (GST_FLAG_IS_SET (bus, GST_BUS_FLUSHING)) {
188     gst_message_unref (message);
189     GST_UNLOCK (bus);
190     return FALSE;
191   }
192
193   handler = bus->sync_handler;
194   handler_data = bus->sync_handler_data;
195
196   GST_UNLOCK (bus);
197
198   /* first call the sync handler if it is installed */
199   if (handler) {
200     reply = handler (bus, message, handler_data);
201   }
202
203   /* now see what we should do with the message */
204   switch (reply) {
205     case GST_BUS_DROP:
206       /* drop the message */
207       break;
208     case GST_BUS_PASS:
209       /* pass the message to the async queue */
210       g_mutex_lock (bus->queue_lock);
211       g_queue_push_tail (bus->queue, message);
212       g_mutex_unlock (bus->queue_lock);
213
214       /* FIXME cannot assume the source is only in the default context */
215       g_main_context_wakeup (NULL);
216
217       break;
218     case GST_BUS_ASYNC:
219     {
220       /* async delivery, we need a mutex and a cond to block
221        * on */
222       GMutex *lock = g_mutex_new ();
223       GCond *cond = g_cond_new ();
224
225       GST_MESSAGE_COND (message) = cond;
226       GST_MESSAGE_GET_LOCK (message) = lock;
227
228       GST_DEBUG ("waiting for async delivery of message %p", message);
229
230       /* now we lock the message mutex, send the message to the async
231        * queue. When the message is handled by the app and destroyed, 
232        * the cond will be signalled and we can continue */
233       g_mutex_lock (lock);
234       g_mutex_lock (bus->queue_lock);
235       g_queue_push_tail (bus->queue, message);
236       g_mutex_unlock (bus->queue_lock);
237
238       /* FIXME cannot assume the source is only in the default context */
239       g_main_context_wakeup (NULL);
240
241       /* now block till the message is freed */
242       g_cond_wait (cond, lock);
243       g_mutex_unlock (lock);
244
245       GST_DEBUG ("message %p delivered asynchronously", message);
246
247       g_mutex_free (lock);
248       g_cond_free (cond);
249       break;
250     }
251   }
252
253   return TRUE;
254 }
255
256 /**
257  * gst_bus_have_pending:
258  * @bus: a #GstBus to check
259  *
260  * Check if there are pending messages on the bus that should be 
261  * handled.
262  *
263  * Returns: TRUE if there are messages on the bus to be handled.
264  *
265  * MT safe.
266  */
267 gboolean
268 gst_bus_have_pending (GstBus * bus)
269 {
270   gint length;
271
272   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
273
274   g_mutex_lock (bus->queue_lock);
275   length = g_queue_get_length (bus->queue);
276   g_mutex_unlock (bus->queue_lock);
277
278   return (length > 0);
279 }
280
281 /**
282  * gst_bus_set_flushing:
283  * @bus: a #GstBus
284  * @flushing: whether or not to flush the bus
285  *
286  * If @flushing, flush out and unref any messages queued in the bus. Releases
287  * references to the message origin objects. Will flush future messages until
288  * gst_bus_set_flushing() sets @flushing to #FALSE.
289  *
290  * MT safe.
291  */
292 void
293 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
294 {
295   GstMessage *message;
296
297   GST_LOCK (bus);
298
299   if (flushing) {
300     GST_FLAG_SET (bus, GST_BUS_FLUSHING);
301
302     while ((message = gst_bus_pop (bus)))
303       gst_message_unref (message);
304   } else {
305     GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
306   }
307
308   GST_UNLOCK (bus);
309 }
310
311 /**
312  * gst_bus_pop:
313  * @bus: a #GstBus to pop
314  *
315  * Get a message from the bus.
316  *
317  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
318  *
319  * MT safe.
320  */
321 GstMessage *
322 gst_bus_pop (GstBus * bus)
323 {
324   GstMessage *message;
325
326   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
327
328   g_mutex_lock (bus->queue_lock);
329   message = g_queue_pop_head (bus->queue);
330   g_mutex_unlock (bus->queue_lock);
331
332   return message;
333 }
334
335 /**
336  * gst_bus_peek:
337  * @bus: a #GstBus
338  *
339  * Peek the message on the top of the bus' queue. The bus maintains ownership of
340  * the message, and the message will remain on the bus' message queue.
341  *
342  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
343  *
344  * MT safe.
345  */
346 GstMessage *
347 gst_bus_peek (GstBus * bus)
348 {
349   GstMessage *message;
350
351   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
352
353   g_mutex_lock (bus->queue_lock);
354   message = g_queue_peek_head (bus->queue);
355   g_mutex_unlock (bus->queue_lock);
356
357   return message;
358 }
359
360 /**
361  * gst_bus_set_sync_handler:
362  * @bus: a #GstBus to install the handler on
363  * @func: The handler function to install
364  * @data: User data that will be sent to the handler function.
365  *
366  * Install a synchronous handler on the bus. The function will be called
367  * every time a new message is posted on the bus. Note that the function
368  * will be called in the same thread context as the posting object.
369  */
370 void
371 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
372 {
373   g_return_if_fail (GST_IS_BUS (bus));
374
375   GST_LOCK (bus);
376   bus->sync_handler = func;
377   bus->sync_handler_data = data;
378   GST_UNLOCK (bus);
379 }
380
381 /* GSource for the bus
382  */
383 typedef struct
384 {
385   GSource source;
386   GstBus *bus;
387 } GstBusSource;
388
389 gboolean
390 gst_bus_source_prepare (GSource * source, gint * timeout)
391 {
392   *timeout = -1;
393   return gst_bus_have_pending (((GstBusSource *) source)->bus);
394 }
395
396 gboolean
397 gst_bus_source_check (GSource * source)
398 {
399   return gst_bus_have_pending (((GstBusSource *) source)->bus);
400 }
401
402 gboolean
403 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
404     gpointer user_data)
405 {
406   GstBusHandler handler = (GstBusHandler) callback;
407   GstBusSource *bsource = (GstBusSource *) source;
408   GstMessage *message;
409   gboolean needs_pop = TRUE;
410
411   g_return_val_if_fail (GST_IS_BUS (bsource->bus), FALSE);
412
413   message = gst_bus_peek (bsource->bus);
414
415   g_return_val_if_fail (message != NULL, TRUE);
416
417   if (!handler) {
418     g_warning ("GstBus watch dispatched without callback\n"
419         "You must call g_source_connect().");
420     return FALSE;
421   }
422
423   needs_pop = handler (bsource->bus, message, user_data);
424
425   if (needs_pop)
426     gst_message_unref (gst_bus_pop (bsource->bus));
427
428   return TRUE;
429 }
430
431 void
432 gst_bus_source_finalize (GSource * source)
433 {
434   GstBusSource *bsource = (GstBusSource *) source;
435
436   gst_object_unref (GST_OBJECT_CAST (bsource->bus));
437   bsource->bus = NULL;
438 }
439
440 static GSourceFuncs gst_bus_source_funcs = {
441   gst_bus_source_prepare,
442   gst_bus_source_check,
443   gst_bus_source_dispatch,
444   gst_bus_source_finalize
445 };
446
447 /**
448  * gst_bus_create_watch:
449  * @bus: a #GstBus to create the watch for
450  *
451  * Create watch for this bus. 
452  *
453  * Returns: A #GSource that can be added to a mainloop.
454  */
455 GSource *
456 gst_bus_create_watch (GstBus * bus)
457 {
458   GstBusSource *source;
459
460   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
461
462   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
463       sizeof (GstBusSource));
464   gst_object_ref (GST_OBJECT_CAST (bus));
465   source->bus = bus;
466
467   return (GSource *) source;
468 }
469
470 /**
471  * gst_bus_add_watch_full:
472  * @bus: a #GstBus to create the watch for.
473  * @priority: The priority of the watch.
474  * @handler: A function to call when a message is received.
475  * @user_data: user data passed to @handler.
476  * @notify: the function to call when the source is removed.
477  *
478  * Adds the bus to the mainloop with the given priority. If the handler returns
479  * TRUE, the message will then be popped off the queue. When the handler is
480  * called, the message belongs to the caller; if you want to keep a copy of it,
481  * call gst_message_ref before leaving the handler.
482  *
483  * Returns: The event source id.
484  *
485  * MT safe.
486  */
487 guint
488 gst_bus_add_watch_full (GstBus * bus, gint priority,
489     GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
490 {
491   guint id;
492   GSource *source;
493
494   g_return_val_if_fail (GST_IS_BUS (bus), 0);
495
496   source = gst_bus_create_watch (bus);
497
498   if (priority != G_PRIORITY_DEFAULT)
499     g_source_set_priority (source, priority);
500
501   g_source_set_callback (source, (GSourceFunc) handler, user_data, notify);
502
503   id = g_source_attach (source, NULL);
504   g_source_unref (source);
505
506   return id;
507 }
508
509 /**
510  * gst_bus_add_watch:
511  * @bus: a #GstBus to create the watch for
512  * @handler: A function to call when a message is received.
513  * @user_data: user data passed to @handler.
514  *
515  * Adds the bus to the mainloop with the default priority.
516  *
517  * Returns: The event source id.
518  *
519  * MT safe.
520  */
521 guint
522 gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data)
523 {
524   return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data,
525       NULL);
526 }
527
528 typedef struct
529 {
530   GMainLoop *loop;
531   guint timeout_id;
532   GstMessageType events;
533   GstMessageType revent;
534 } GstBusPollData;
535
536 static gboolean
537 poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
538 {
539   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
540     poll_data->revent = GST_MESSAGE_TYPE (message);
541     if (g_main_loop_is_running (poll_data->loop))
542       g_main_loop_quit (poll_data->loop);
543     /* keep the message on the queue */
544     return FALSE;
545   } else {
546     /* pop and unref the message */
547     return TRUE;
548   }
549 }
550
551 static gboolean
552 poll_timeout (GstBusPollData * poll_data)
553 {
554   poll_data->timeout_id = 0;
555   g_main_loop_quit (poll_data->loop);
556   /* returning FALSE will remove the source id */
557   return FALSE;
558 }
559
560 /**
561  * gst_bus_poll:
562  * @bus: a #GstBus
563  * @events: a mask of #GstMessageType, representing the set of message types to
564  * poll for.
565  * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
566  *
567  * Poll the bus for events. Will block while waiting for events to come. You can
568  * specify a maximum time to poll with the @timeout parameter. If @timeout is
569  * negative, this function will block indefinitely.
570  *
571  * Returns: The type of the message that was received, or GST_MESSAGE_UNKNOWN if
572  * the poll timed out. The message will remain in the bus queue; you will need
573  * to gst_bus_pop() it off before entering gst_bus_poll() again.
574  */
575 GstMessageType
576 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
577 {
578   GstBusPollData *poll_data;
579   GstMessageType ret;
580   guint id;
581
582   poll_data = g_new0 (GstBusPollData, 1);
583   if (timeout >= 0)
584     poll_data->timeout_id = g_timeout_add (timeout / GST_MSECOND,
585         (GSourceFunc) poll_timeout, poll_data);
586   poll_data->loop = g_main_loop_new (NULL, FALSE);
587   poll_data->events = events;
588   poll_data->revent = GST_MESSAGE_UNKNOWN;
589
590   id = gst_bus_add_watch (bus, (GstBusHandler) poll_handler, poll_data);
591   g_main_loop_run (poll_data->loop);
592   g_source_remove (id);
593
594   ret = poll_data->revent;
595
596   if (poll_data->timeout_id)
597     g_source_remove (poll_data->timeout_id);
598   g_main_loop_unref (poll_data->loop);
599   g_free (poll_data);
600
601   return ret;
602 }