X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Fgstbus.c;h=cddaf390b864f64c44a1839b19e57afe6feb1f38;hb=e6bd5b41935f125bf43e030dcb909c3537d33b31;hp=b9ce67bc8f45122b7d583a9d7dc9f94253471bb3;hpb=c8b512d2f0a468eba940bede3c7d9ce485a2176a;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/gstbus.c b/gst/gstbus.c index b9ce67b..cddaf39 100644 --- a/gst/gstbus.c +++ b/gst/gstbus.c @@ -21,6 +21,7 @@ /** * SECTION:gstbus + * @title: GstBus * @short_description: Asynchronous message bus subsystem * @see_also: #GstMessage, #GstElement * @@ -79,6 +80,12 @@ #include "gstbus.h" #include "glib-compat-private.h" +#ifdef G_OS_WIN32 +# ifndef EWOULDBLOCK +# define EWOULDBLOCK EAGAIN /* This is just to placate gcc */ +# endif +#endif /* G_OS_WIN32 */ + #define GST_CAT_DEFAULT GST_CAT_BUS /* bus signals */ enum @@ -122,7 +129,7 @@ struct _GstBusPrivate }; #define gst_bus_parent_class parent_class -G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT); +G_DEFINE_TYPE_WITH_PRIVATE (GstBus, gst_bus, GST_TYPE_OBJECT); static void gst_bus_set_property (GObject * object, @@ -149,6 +156,8 @@ gst_bus_constructed (GObject * object) bus->priv->poll = gst_poll_new_timer (); gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd); } + + G_OBJECT_CLASS (gst_bus_parent_class)->constructed (object); } static void @@ -208,21 +217,16 @@ gst_bus_class_init (GstBusClass * klass) G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED, G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_MESSAGE); - - g_type_class_add_private (klass, sizeof (GstBusPrivate)); } static void gst_bus_init (GstBus * bus) { - bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate); + bus->priv = gst_bus_get_instance_private (bus); bus->priv->enable_async = DEFAULT_ENABLE_ASYNC; g_mutex_init (&bus->priv->queue_lock); bus->priv->queue = gst_atomic_queue_new (32); - /* clear floating flag */ - gst_object_ref_sink (bus); - GST_DEBUG_OBJECT (bus, "created"); } @@ -276,9 +280,12 @@ gst_bus_new (void) { GstBus *result; - result = g_object_newv (gst_bus_get_type (), 0, NULL); + result = g_object_new (gst_bus_get_type (), NULL); GST_DEBUG_OBJECT (result, "created new bus"); + /* clear floating flag */ + gst_object_ref_sink (result); + return result; } @@ -308,6 +315,10 @@ gst_bus_post (GstBus * bus, GstMessage * message) GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus %" GST_PTR_FORMAT, message, message); + /* check we didn't accidentally add a public flag that maps to same value */ + g_assert (!GST_MINI_OBJECT_FLAG_IS_SET (message, + GST_MESSAGE_FLAG_ASYNC_DELIVERY)); + GST_OBJECT_LOCK (bus); /* check if the bus is flushing */ if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING)) @@ -357,6 +368,8 @@ gst_bus_post (GstBus * bus, GstMessage * message) g_cond_init (cond); g_mutex_init (lock); + GST_MINI_OBJECT_FLAG_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY); + GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message); /* now we lock the message mutex, send the message to the async @@ -369,12 +382,18 @@ gst_bus_post (GstBus * bus, GstMessage * message) /* now block till the message is freed */ g_cond_wait (cond, lock); + + /* we acquired a new ref from gst_message_dispose() so we can clean up */ g_mutex_unlock (lock); GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message); + GST_MINI_OBJECT_FLAG_UNSET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY); + g_mutex_clear (lock); g_cond_clear (cond); + + gst_message_unref (message); break; } default: @@ -387,8 +406,8 @@ gst_bus_post (GstBus * bus, GstMessage * message) is_flushing: { GST_DEBUG_OBJECT (bus, "bus is flushing"); - gst_message_unref (message); GST_OBJECT_UNLOCK (bus); + gst_message_unref (message); return FALSE; } @@ -434,6 +453,9 @@ void gst_bus_set_flushing (GstBus * bus, gboolean flushing) { GstMessage *message; + GList *message_list = NULL; + + g_return_if_fail (GST_IS_BUS (bus)); GST_OBJECT_LOCK (bus); @@ -443,13 +465,15 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing) GST_DEBUG_OBJECT (bus, "set bus flushing"); while ((message = gst_bus_pop (bus))) - gst_message_unref (message); + message_list = g_list_prepend (message_list, message); } else { GST_DEBUG_OBJECT (bus, "unset bus flushing"); GST_OBJECT_FLAG_UNSET (bus, GST_BUS_FLUSHING); } GST_OBJECT_UNLOCK (bus); + + g_list_free_full (message_list, (GDestroyNotify) gst_message_unref); } /** @@ -496,8 +520,24 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, gst_atomic_queue_length (bus->priv->queue)); while ((message = gst_atomic_queue_pop (bus->priv->queue))) { - if (bus->priv->poll) - gst_poll_read_control (bus->priv->poll); + if (bus->priv->poll) { + while (!gst_poll_read_control (bus->priv->poll)) { + if (errno == EWOULDBLOCK) { + /* Retry, this can happen if pushing to the queue has finished, + * popping here succeeded but writing control did not finish + * before we got to this line. */ + /* Give other threads the chance to do something */ + g_thread_yield (); + continue; + } else { + /* This is a real error and means that either the bus is in an + * inconsistent state, or the GstPoll is invalid. GstPoll already + * prints a critical warning about this, no need to do that again + * ourselves */ + break; + } + } + } GST_DEBUG_OBJECT (bus, "got message %p, %s from %s, type mask is %u", message, GST_MESSAGE_TYPE_NAME (message), @@ -505,7 +545,7 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, if ((GST_MESSAGE_TYPE (message) & types) != 0) { /* Extra check to ensure extended types don't get matched unless * asked for */ - if ((GST_MESSAGE_TYPE_IS_EXTENDED (message) == FALSE) + if ((!GST_MESSAGE_TYPE_IS_EXTENDED (message)) || (types & GST_MESSAGE_EXTENDED)) { /* exit the loop, we have a message */ goto beach; @@ -719,6 +759,31 @@ no_replace: } } +/** + * gst_bus_get_pollfd: + * @bus: A #GstBus + * @fd: (out): A GPollFD to fill + * + * Gets the file descriptor from the bus which can be used to get notified about + * messages being available with functions like g_poll(), and allows integration + * into other event loops based on file descriptors. + * Whenever a message is available, the POLLIN / %G_IO_IN event is set. + * + * Warning: NEVER read or write anything to the returned fd but only use it + * for getting notifications via g_poll() or similar and then use the normal + * GstBus API, e.g. gst_bus_pop(). + * + * Since: 1.14 + */ +void +gst_bus_get_pollfd (GstBus * bus, GPollFD * fd) +{ + g_return_if_fail (GST_IS_BUS (bus)); + g_return_if_fail (bus->priv->poll != NULL); + + *fd = bus->priv->pollfd; +} + /* GSource for the bus */ typedef struct @@ -821,7 +886,7 @@ static GSourceFuncs gst_bus_source_funcs = { * a message is on the bus. After the GSource is dispatched, the * message is popped off the bus and unreffed. * - * Returns: (transfer full): a #GSource that can be added to a mainloop. + * Returns: (transfer full) (nullable): a #GSource that can be added to a mainloop. */ GSource * gst_bus_create_watch (GstBus * bus) @@ -858,6 +923,10 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority, } source = gst_bus_create_watch (bus); + if (!source) { + g_critical ("Creating bus watch failed"); + return 0; + } if (priority != G_PRIORITY_DEFAULT) g_source_set_priority (source, priority); @@ -877,7 +946,7 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority, } /** - * gst_bus_add_watch_full: + * gst_bus_add_watch_full: (rename-to gst_bus_add_watch) * @bus: a #GstBus to create the watch for. * @priority: The priority of the watch. * @func: A function to call when a message is received. @@ -894,6 +963,8 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority, * There can only be a single bus watch per bus, you must remove it before you * can set a new one. * + * The bus watch will only work if a GLib main loop is being run. + * * When @func is called, the message belongs to the caller; if you want to * keep a copy of it, call gst_message_ref() before leaving @func. * @@ -901,10 +972,12 @@ gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority, * from @func. If the watch was added to the default main context it is also * possible to remove the watch using g_source_remove(). * + * The bus watch will take its own reference to the @bus, so it is safe to unref + * @bus using gst_object_unref() after setting the bus watch. + * * MT safe. * * Returns: The event source id or 0 if @bus already got an event source. - * Rename to: gst_bus_add_watch */ guint gst_bus_add_watch_full (GstBus * bus, gint priority, @@ -937,13 +1010,18 @@ gst_bus_add_watch_full (GstBus * bus, gint priority, * There can only be a single bus watch per bus, you must remove it before you * can set a new one. * + * The bus watch will only work if a GLib main loop is being run. + * * The watch can be removed using gst_bus_remove_watch() or by returning %FALSE * from @func. If the watch was added to the default main context it is also * possible to remove the watch using g_source_remove(). * - * Returns: The event source id or 0 if @bus already got an event source. + * The bus watch will take its own reference to the @bus, so it is safe to unref + * @bus using gst_object_unref() after setting the bus watch. * * MT safe. + * + * Returns: The event source id or 0 if @bus already got an event source. */ guint gst_bus_add_watch (GstBus * bus, GstBusFunc func, gpointer user_data) @@ -966,7 +1044,7 @@ gst_bus_add_watch (GstBus * bus, GstBusFunc func, gpointer user_data) gboolean gst_bus_remove_watch (GstBus * bus) { - GSource *watch_id; + GSource *source; g_return_val_if_fail (GST_IS_BUS (bus), FALSE); @@ -974,18 +1052,28 @@ gst_bus_remove_watch (GstBus * bus) if (bus->priv->signal_watch == NULL) { GST_ERROR_OBJECT (bus, "no bus watch was present"); - goto no_watch; + goto error; + } + + if (bus->priv->num_signal_watchers > 0) { + GST_ERROR_OBJECT (bus, + "trying to remove signal watch with gst_bus_remove_watch()"); + goto error; } - watch_id = bus->priv->signal_watch; + source = + bus->priv->signal_watch ? g_source_ref (bus->priv->signal_watch) : NULL; GST_OBJECT_UNLOCK (bus); - g_source_destroy (watch_id); + if (source) { + g_source_destroy (source); + g_source_unref (source); + } return TRUE; -no_watch: +error: GST_OBJECT_UNLOCK (bus); return FALSE; @@ -1108,6 +1196,8 @@ gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTime timeout) GstMessage *ret; gulong id; + g_return_val_if_fail (GST_IS_BUS (bus), NULL); + poll_data = g_slice_new (GstBusPollData); poll_data->source_running = TRUE; poll_data->loop = g_main_loop_new (NULL, FALSE); @@ -1373,13 +1463,16 @@ gst_bus_remove_signal_watch (GstBus * bus) GST_DEBUG_OBJECT (bus, "removing signal watch %u", g_source_get_id (bus->priv->signal_watch)); - source = bus->priv->signal_watch; + source = + bus->priv->signal_watch ? g_source_ref (bus->priv->signal_watch) : NULL; done: GST_OBJECT_UNLOCK (bus); - if (source) + if (source) { g_source_destroy (source); + g_source_unref (source); + } return;