Next big merge.
[platform/upstream/gstreamer.git] / gst / gstbus.c
1 /* GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbus.c: GstBus subsystem
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22
23 #include <errno.h>
24 #include <unistd.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27
28 #include "gst_private.h"
29 #include "gstinfo.h"
30
31 #include "gstbus.h"
32
33 enum
34 {
35   ARG_0,
36 };
37
38 static void gst_bus_class_init (GstBusClass * klass);
39 static void gst_bus_init (GstBus * bus);
40 static void gst_bus_dispose (GObject * object);
41
42 static void gst_bus_set_property (GObject * object, guint prop_id,
43     const GValue * value, GParamSpec * pspec);
44 static void gst_bus_get_property (GObject * object, guint prop_id,
45     GValue * value, GParamSpec * pspec);
46
47 static GstObjectClass *parent_class = NULL;
48
49 /* static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; */
50
51 GType
52 gst_bus_get_type (void)
53 {
54   static GType bus_type = 0;
55
56   if (!bus_type) {
57     static const GTypeInfo bus_info = {
58       sizeof (GstBusClass),
59       NULL,
60       NULL,
61       (GClassInitFunc) gst_bus_class_init,
62       NULL,
63       NULL,
64       sizeof (GstBus),
65       0,
66       (GInstanceInitFunc) gst_bus_init,
67       NULL
68     };
69
70     bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
71   }
72   return bus_type;
73 }
74
75 static void
76 gst_bus_class_init (GstBusClass * klass)
77 {
78   GObjectClass *gobject_class;
79   GstObjectClass *gstobject_class;
80
81   gobject_class = (GObjectClass *) klass;
82   gstobject_class = (GstObjectClass *) klass;
83
84   parent_class = g_type_class_ref (GST_TYPE_OBJECT);
85
86   if (!g_thread_supported ())
87     g_thread_init (NULL);
88
89   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
90   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
91   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
92 }
93
94 static void
95 gst_bus_init (GstBus * bus)
96 {
97   bus->queue = g_queue_new ();
98   bus->queue_lock = g_mutex_new ();
99
100   if (socketpair (PF_UNIX, SOCK_STREAM, 0, bus->control_socket) < 0)
101     goto no_socketpair;
102
103   bus->io_channel = g_io_channel_unix_new (bus->control_socket[0]);
104
105   return;
106
107   /* errors */
108 no_socketpair:
109   {
110     g_warning ("cannot create io channel");
111     bus->io_channel = NULL;
112   }
113 }
114
115 static void
116 gst_bus_dispose (GObject * object)
117 {
118   GstBus *bus;
119
120   bus = GST_BUS (object);
121
122   if (bus->io_channel) {
123     g_io_channel_shutdown (bus->io_channel, TRUE, NULL);
124     g_io_channel_unref (bus->io_channel);
125     bus->io_channel = NULL;
126   }
127   close (bus->control_socket[0]);
128   close (bus->control_socket[1]);
129
130   if (bus->queue) {
131     g_mutex_lock (bus->queue_lock);
132     g_queue_free (bus->queue);
133     bus->queue = NULL;
134     g_mutex_unlock (bus->queue_lock);
135     g_mutex_free (bus->queue_lock);
136     bus->queue_lock = NULL;
137   }
138
139   G_OBJECT_CLASS (parent_class)->dispose (object);
140 }
141
142 static void
143 gst_bus_set_property (GObject * object, guint prop_id,
144     const GValue * value, GParamSpec * pspec)
145 {
146   GstBus *bus;
147
148   bus = GST_BUS (object);
149
150   switch (prop_id) {
151     default:
152       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
153       break;
154   }
155 }
156
157 static void
158 gst_bus_get_property (GObject * object, guint prop_id,
159     GValue * value, GParamSpec * pspec)
160 {
161   GstBus *bus;
162
163   bus = GST_BUS (object);
164
165   switch (prop_id) {
166     default:
167       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
168       break;
169   }
170 }
171
172 GstBus *
173 gst_bus_new (void)
174 {
175   GstBus *result;
176
177   result = g_object_new (gst_bus_get_type (), NULL);
178
179   return result;
180 }
181
182 /**
183  * gst_bus_post:
184  * @bus: a #GstBus to post on
185  * @message: The #GstMessage to post
186  *
187  * Post a message on the given bus.
188  *
189  * Returns: TRUE if the message could be posted.
190  *
191  * MT safe.
192  */
193 gboolean
194 gst_bus_post (GstBus * bus, GstMessage * message)
195 {
196   gchar c;
197   GstBusSyncReply reply = GST_BUS_PASS;
198   GstBusSyncHandler handler;
199   gpointer handler_data;
200   gboolean need_write = FALSE;
201   ssize_t write_ret = -1;
202
203   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
204   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
205
206   //g_print ("posting message on bus, type %d\n", GST_MESSAGE_TYPE (message));
207   GST_DEBUG_OBJECT (bus, "posting message on bus");
208
209   GST_LOCK (bus);
210   handler = bus->sync_handler;
211   handler_data = bus->sync_handler_data;
212   GST_UNLOCK (bus);
213
214   /* first call the sync handler if it is installed */
215   if (handler) {
216     reply = handler (bus, message, handler_data);
217   }
218
219   /* now see what we should do with the message */
220   switch (reply) {
221     case GST_BUS_DROP:
222       /* drop the message */
223       break;
224     case GST_BUS_PASS:
225       /* pass the message to the async queue */
226       g_mutex_lock (bus->queue_lock);
227       if (g_queue_get_length (bus->queue) == 0)
228         need_write = TRUE;
229       g_queue_push_tail (bus->queue, message);
230       g_mutex_unlock (bus->queue_lock);
231
232       if (g_queue_get_length (bus->queue) == 0)
233         need_write = TRUE;
234       if (need_write) {
235         c = 'p';
236         errno = EAGAIN;
237         while (write_ret == -1) {
238           switch (errno) {
239             case EAGAIN:
240             case EINTR:
241               break;
242             default:
243               perror ("gst_bus_post: could not write to fd");
244               return FALSE;
245           }
246           write_ret = write (bus->control_socket[1], &c, 1);
247         }
248       }
249       break;
250     case GST_BUS_ASYNC:
251     {
252       /* async delivery, we need a mutex and a cond to block
253        * on */
254       GMutex *lock = g_mutex_new ();
255       GCond *cond = g_cond_new ();
256
257       GST_MESSAGE_COND (message) = cond;
258       GST_MESSAGE_GET_LOCK (message) = lock;
259
260       GST_DEBUG ("waiting for async delivery of message %p", message);
261
262       /* now we lock the message mutex, send the message to the async
263        * queue. When the message is handled by the app and destroyed, 
264        * the cond will be signalled and we can continue */
265       g_mutex_lock (lock);
266       g_mutex_lock (bus->queue_lock);
267       if (g_queue_get_length (bus->queue) == 0)
268         need_write = TRUE;
269       g_queue_push_tail (bus->queue, message);
270       g_mutex_unlock (bus->queue_lock);
271
272       if (need_write) {
273         c = 'p';
274         errno = EAGAIN;
275         while (write_ret == -1) {
276           switch (errno) {
277             case EAGAIN:
278             case EINTR:
279               break;
280             default:
281               perror ("gst_bus_post: could not write to fd");
282               return FALSE;
283           }
284           write_ret = write (bus->control_socket[1], &c, 1);
285         }
286       }
287
288       /* now block till the message is freed */
289       g_cond_wait (cond, lock);
290       g_mutex_unlock (lock);
291
292       GST_DEBUG ("message %p delivered asynchronously", message);
293
294       g_mutex_free (lock);
295       g_cond_free (cond);
296       break;
297     }
298   }
299
300   return TRUE;
301 }
302
303 /**
304  * gst_bus_have_pending:
305  * @bus: a #GstBus to check
306  *
307  * Check if there are pending messages on the bus that should be 
308  * handled.
309  *
310  * Returns: TRUE if there are messages on the bus to be handled.
311  *
312  * MT safe.
313  */
314 gboolean
315 gst_bus_have_pending (GstBus * bus)
316 {
317   gint length;
318
319   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
320
321   g_mutex_lock (bus->queue_lock);
322   length = g_queue_get_length (bus->queue);
323   g_mutex_unlock (bus->queue_lock);
324
325   return (length > 0);
326 }
327
328 /**
329  * gst_bus_pop:
330  * @bus: a #GstBus to pop
331  *
332  * Get a message from the bus.
333  *
334  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
335  *
336  * MT safe.
337  */
338 GstMessage *
339 gst_bus_pop (GstBus * bus)
340 {
341   GstMessage *message;
342   gboolean needs_read = FALSE;
343
344   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
345
346   g_mutex_lock (bus->queue_lock);
347   message = g_queue_pop_head (bus->queue);
348   if (message && g_queue_get_length (bus->queue) == 0)
349     needs_read = TRUE;
350   g_mutex_unlock (bus->queue_lock);
351
352   if (needs_read) {
353     gchar c;
354     ssize_t read_ret = -1;
355
356     /* the char in the fd is essentially just a way to wake us up. read it off so
357        we're not woken up again. */
358     errno = EAGAIN;
359     while (read_ret == -1) {
360       switch (errno) {
361         case EAGAIN:
362         case EINTR:
363           break;
364         default:
365           perror ("gst_bus_pop: could not read from fd");
366           return NULL;
367       }
368       read_ret = read (bus->control_socket[0], &c, 1);
369     }
370   }
371
372   return message;
373 }
374
375 /**
376  * gst_bus_peek:
377  * @bus: a #GstBus
378  *
379  * Peek the message on the top of the bus' queue. The bus maintains ownership of
380  * the message, and the message will remain on the bus' message queue.
381  *
382  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
383  *
384  * MT safe.
385  */
386 GstMessage *
387 gst_bus_peek (GstBus * bus)
388 {
389   GstMessage *message;
390
391   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
392
393   g_mutex_lock (bus->queue_lock);
394   message = g_queue_peek_head (bus->queue);
395   g_mutex_unlock (bus->queue_lock);
396
397   return message;
398 }
399
400 /**
401  * gst_bus_set_sync_handler:
402  * @bus: a #GstBus to install the handler on
403  * @func: The handler function to install
404  * @data: User data that will be sent to the handler function.
405  *
406  * Install a synchronous handler on the bus. The function will be called
407  * every time a new message is posted on the bus. Note that the function
408  * will be called in the same thread context as the posting object.
409  */
410 void
411 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
412 {
413   g_return_if_fail (GST_IS_BUS (bus));
414
415   GST_LOCK (bus);
416   bus->sync_handler = func;
417   bus->sync_handler_data = data;
418   GST_UNLOCK (bus);
419 }
420
421 /**
422  * gst_bus_create_watch:
423  * @bus: a #GstBus to create the watch for
424  *
425  * Create watch for this bus. 
426  *
427  * Returns: A #GSource that can be added to a mainloop.
428  */
429 GSource *
430 gst_bus_create_watch (GstBus * bus)
431 {
432   GSource *source;
433
434   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
435
436   /* FIXME, we need to ref the bus and unref it when the source
437    * is destroyed */
438   source = g_io_create_watch (bus->io_channel, G_IO_IN);
439
440   return source;
441 }
442
443 typedef struct
444 {
445   GSource *source;
446   GstBus *bus;
447   gint priority;
448   GstBusHandler handler;
449   gpointer user_data;
450   GDestroyNotify notify;
451 } GstBusWatch;
452
453 static gboolean
454 bus_watch_callback (GIOChannel * channel, GIOCondition cond,
455     GstBusWatch * watch)
456 {
457   GstMessage *message;
458   gboolean needs_pop = TRUE;
459
460   g_return_val_if_fail (GST_IS_BUS (watch->bus), FALSE);
461
462   message = gst_bus_peek (watch->bus);
463
464   g_return_val_if_fail (message != NULL, TRUE);
465
466   if (watch->handler)
467     needs_pop = watch->handler (watch->bus, message, watch->user_data);
468
469   if (needs_pop)
470     gst_message_unref (gst_bus_pop (watch->bus));
471
472   return TRUE;
473 }
474
475 static void
476 bus_watch_destroy (GstBusWatch * watch)
477 {
478   if (watch->notify) {
479     watch->notify (watch->user_data);
480   }
481   gst_object_unref (GST_OBJECT_CAST (watch->bus));
482   g_free (watch);
483 }
484
485 /**
486  * gst_bus_add_watch_full:
487  * @bus: a #GstBus to create the watch for
488  * @handler: A function to call when a message is received.
489  *
490  * Adds the bus to the mainloop with the given priority. If the handler returns
491  * TRUE, the message will then be popped off the queue. When the handler is
492  * called, the message belongs to the caller; if you want to keep a copy of it,
493  * call gst_message_ref before leaving the handler.
494  *
495  * Returns: The event source id.
496  *
497  * MT safe.
498  */
499 guint
500 gst_bus_add_watch_full (GstBus * bus, gint priority,
501     GstBusHandler handler, gpointer user_data, GDestroyNotify notify)
502 {
503   guint id;
504   GstBusWatch *watch;
505
506   g_return_val_if_fail (GST_IS_BUS (bus), 0);
507
508   watch = g_new (GstBusWatch, 1);
509
510   gst_object_ref (GST_OBJECT_CAST (bus));
511   watch->source = gst_bus_create_watch (bus);
512   watch->bus = bus;
513   watch->priority = priority;
514   watch->handler = handler;
515   watch->user_data = user_data;
516   watch->notify = notify;
517
518   if (priority != G_PRIORITY_DEFAULT)
519     g_source_set_priority (watch->source, priority);
520
521   g_source_set_callback (watch->source, (GSourceFunc) bus_watch_callback,
522       watch, (GDestroyNotify) bus_watch_destroy);
523
524   id = g_source_attach (watch->source, NULL);
525   g_source_unref (watch->source);
526
527   return id;
528 }
529
530 /**
531  * gst_bus_add_watch:
532  * @bus: a #GstBus to create the watch for
533  *
534  * Adds the bus to the mainloop with the default priority.
535  *
536  * Returns: The event source id.
537  *
538  * MT safe.
539  */
540 guint
541 gst_bus_add_watch (GstBus * bus, GstBusHandler handler, gpointer user_data)
542 {
543   return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, handler, user_data,
544       NULL);
545 }
546
547 typedef struct
548 {
549   GMainLoop *loop;
550   guint timeout_id;
551   GstMessageType events;
552   GstMessageType revent;
553 } GstBusPollData;
554
555 static gboolean
556 poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
557 {
558   if (GST_MESSAGE_TYPE (message) & poll_data->events) {
559     poll_data->revent = GST_MESSAGE_TYPE (message);
560     if (g_main_loop_is_running (poll_data->loop))
561       g_main_loop_quit (poll_data->loop);
562     /* keep the message on the queue */
563     return FALSE;
564   } else {
565     /* pop and unref the message */
566     return TRUE;
567   }
568 }
569
570 static gboolean
571 poll_timeout (GstBusPollData * poll_data)
572 {
573   poll_data->timeout_id = 0;
574   g_main_loop_quit (poll_data->loop);
575   /* returning FALSE will remove the source id */
576   return FALSE;
577 }
578
579 /**
580  * gst_bus_poll:
581  * @bus: a #GstBus
582  * @events: a mask of #GstMessageType, representing the set of message types to
583  * poll for.
584  * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
585  *
586  * Poll the bus for events. Will block while waiting for events to come. You can
587  * specify a maximum time to poll with the @timeout parameter. If @timeout is
588  * negative, this function will block indefinitely.
589  *
590  * Returns: The type of the message that was received, or GST_MESSAGE_UNKNOWN if
591  * the poll timed out. The message will remain in the bus queue; you will need
592  * to gst_bus_pop() it off before entering gst_bus_poll() again.
593  */
594 GstMessageType
595 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
596 {
597   GstBusPollData *poll_data;
598   GstMessageType ret;
599   guint id;
600
601   poll_data = g_new0 (GstBusPollData, 1);
602   if (timeout >= 0)
603     poll_data->timeout_id = g_timeout_add (timeout / GST_MSECOND,
604         (GSourceFunc) poll_timeout, poll_data);
605   poll_data->loop = g_main_loop_new (NULL, FALSE);
606   poll_data->events = events;
607   poll_data->revent = GST_MESSAGE_UNKNOWN;
608
609   id = gst_bus_add_watch (bus, (GstBusHandler) poll_handler, poll_data);
610   g_main_loop_run (poll_data->loop);
611   g_source_remove (id);
612
613   ret = poll_data->revent;
614
615   if (poll_data->timeout_id)
616     g_source_remove (poll_data->timeout_id);
617   g_main_loop_unref (poll_data->loop);
618   g_free (poll_data);
619
620   return ret;
621 }