nle: Stop using a MainContext avoiding needing one iter per source dispach
authorThibault Saunier <tsaunier@gnome.org>
Wed, 20 Aug 2014 11:15:30 +0000 (13:15 +0200)
committerThibault Saunier <tsaunier@gnome.org>
Fri, 31 Oct 2014 10:58:12 +0000 (11:58 +0100)
Using GClosure to handle the source handling and handle our action
ordering ourselves

https://bugzilla.gnome.org/show_bug.cgi?id=733342

ges/nle/nlecomposition.c

index 1dc7635..457f576 100644 (file)
@@ -155,14 +155,10 @@ struct _NleCompositionPrivate
   GstPadEventFunction nle_event_pad_func;
   gboolean send_stream_start;
 
-  GMainContext *mcontext;
-  /* Ensure that when we remove all sources from the maincontext
-   * we can not add any source, avoiding:
-   * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */
-  GMutex mcontext_lock;
-  GList *gsources;
-  GList *update_gsources;
-  GList *seek_gsources;
+  /* Protect the actions list */
+  GMutex actions_lock;
+  GCond actions_cond;
+  GList *actions;
 
   gboolean running;
   gboolean initialized;
@@ -183,6 +179,14 @@ struct _NleCompositionPrivate
   gboolean tearing_down_stack;
 };
 
+typedef struct _Action
+{
+  GCClosure closure;
+  gint priority;
+} Action;
+
+#define ACTION_CALLBACK(__action) (((GCClosure*) (__action))->callback)
+
 static guint _signals[LAST_SIGNAL] = { 0 };
 
 static GParamSpec *nleobject_properties[NLEOBJECT_PROP_LAST];
@@ -224,20 +228,25 @@ nle_composition_event_handler (GstPad * ghostpad, GstObject * parent,
     GstEvent * event);
 static void _relink_single_node (NleComposition * comp, GNode * node,
     GstEvent * toplevel_seek);
-static gboolean _update_pipeline_func (UpdateCompositionData * ucompo);
-static gboolean _commit_func (UpdateCompositionData * ucompo);
+static void _update_pipeline_func (NleComposition * comp,
+    UpdateCompositionData * ucompo);
+static void _commit_func (NleComposition * comp,
+    UpdateCompositionData * ucompo);
 static GstEvent *get_new_seek_event (NleComposition * comp, gboolean initial,
     gboolean updatestoponly);
-static gboolean
-_nle_composition_add_object (NleComposition * comp, NleObject * object);
-static gboolean
-_nle_composition_remove_object (NleComposition * comp, NleObject * object);
+static gboolean _nle_composition_add_object (NleComposition * comp,
+    NleObject * object);
+static gboolean _nle_composition_remove_object (NleComposition * comp,
+    NleObject * object);
 static void _deactivate_stack (NleComposition * comp,
     gboolean flush_downstream);
 static gboolean _set_real_eos_seqnum_from_seek (NleComposition * comp,
     GstEvent * event);
-static gboolean _emit_commited_signal_func (NleComposition * comp);
+static void _emit_commited_signal_func (NleComposition * comp, gpointer udata);
 static void _restart_task (UpdateCompositionData * ucompo);
+static void
+_add_action (NleComposition * comp, GCallback func, gpointer data,
+    gint priority);
 
 
 /* COMP_REAL_START: actual position to start current playback at. */
@@ -249,20 +258,35 @@ static void _restart_task (UpdateCompositionData * ucompo);
    (MIN (comp->priv->segment->stop, NLE_OBJECT_STOP (comp))) :                 \
    NLE_OBJECT_STOP (comp))
 
-#define MAIN_CONTEXT_LOCK(comp) G_STMT_START {                       \
-  GST_LOG_OBJECT (comp, "Getting MAIN_CONTEXT_LOCK in thread %p",    \
+#define ACTIONS_LOCK(comp) G_STMT_START {                       \
+  GST_LOG_OBJECT (comp, "Getting ACTIONS_LOCK in thread %p",    \
         g_thread_self());                                            \
-  g_mutex_lock(&((NleComposition*)comp)->priv->mcontext_lock);    \
-  GST_LOG_OBJECT (comp, "Got MAIN_CONTEXT_LOCK in thread %p",        \
+  g_mutex_lock(&((NleComposition*)comp)->priv->actions_lock);    \
+  GST_LOG_OBJECT (comp, "Got ACTIONS_LOCK in thread %p",        \
         g_thread_self());                                            \
 } G_STMT_END
 
-#define MAIN_CONTEXT_UNLOCK(comp) G_STMT_START {                     \
-  g_mutex_unlock(&((NleComposition*)comp)->priv->mcontext_lock);  \
-  GST_LOG_OBJECT (comp, "Unlocked MAIN_CONTEXT_LOCK in thread %p",   \
+#define ACTIONS_UNLOCK(comp) G_STMT_START {                     \
+  g_mutex_unlock(&((NleComposition*)comp)->priv->actions_lock);  \
+  GST_LOG_OBJECT (comp, "Unlocked ACTIONS_LOCK in thread %p",   \
         g_thread_self());                                            \
 } G_STMT_END
 
+#define WAIT_FOR_AN_ACTION(comp) G_STMT_START {                     \
+  GST_LOG_OBJECT (comp, "Waiting for an action in thread %p",       \
+        g_thread_self());                                           \
+  g_cond_wait(&((NleComposition*)comp)->priv->actions_cond,         \
+      &((NleComposition*)comp)->priv->actions_lock);                  \
+  GST_LOG_OBJECT (comp, "Done WAITING for an action in thread %p",       \
+        g_thread_self());                                           \
+} G_STMT_END
+
+#define SIGNAL_NEW_ACTION(comp) G_STMT_START {                     \
+  GST_LOG_OBJECT (comp, "Signalling new action from thread %p",       \
+        g_thread_self());                                           \
+  g_cond_signal(&((NleComposition*)comp)->priv->actions_cond);     \
+} G_STMT_END
+
 #define GET_TASK_LOCK(comp)    (&(NLE_COMPOSITION(comp)->task_rec_lock))
 
 static inline gboolean
@@ -287,42 +311,63 @@ _assert_proper_thread (NleComposition * comp)
 }
 
 static void
-_destroy_gsource (GSource * source)
+_remove_actions_for_type (NleComposition * comp, GCallback callback)
 {
-  g_source_destroy (source);
-  g_source_unref (source);
-}
+  GList *tmp;
 
-static void
-_remove_all_update_sources (NleComposition * comp)
-{
-  MAIN_CONTEXT_LOCK (comp);
-  g_list_free_full (comp->priv->update_gsources,
-      (GDestroyNotify) _destroy_gsource);
-  comp->priv->update_gsources = NULL;
-  MAIN_CONTEXT_UNLOCK (comp);
-}
+  ACTIONS_LOCK (comp);
+  for (tmp = comp->priv->actions; tmp; tmp = tmp->next) {
+    Action *act = tmp->data;
+
+    if (ACTION_CALLBACK (act) == callback) {
+      g_closure_unref ((GClosure *) act);
+      comp->priv->actions = g_list_delete_link (comp->priv->actions, tmp);
+    }
+  }
+  ACTIONS_UNLOCK (comp);
 
-static void
-_remove_all_seek_sources (NleComposition * comp)
-{
-  MAIN_CONTEXT_LOCK (comp);
-  g_list_free_full (comp->priv->seek_gsources,
-      (GDestroyNotify) _destroy_gsource);
-  comp->priv->seek_gsources = NULL;
-  MAIN_CONTEXT_UNLOCK (comp);
 }
 
 static void
-iterate_main_context_func (NleComposition * comp)
+_execute_actions (NleComposition * comp)
 {
-  if (comp->priv->running == FALSE) {
+  NleCompositionPrivate *priv = comp->priv;
+
+  ACTIONS_LOCK (comp);
+  if (priv->running == FALSE) {
     GST_DEBUG_OBJECT (comp, "Not running anymore");
 
+    ACTIONS_UNLOCK (comp);
     return;
   }
 
-  g_main_context_iteration (comp->priv->mcontext, TRUE);
+  if (priv->actions == NULL)
+    WAIT_FOR_AN_ACTION (comp);
+
+  if (comp->priv->running == FALSE) {
+    GST_INFO_OBJECT (comp, "Done waiting but not running anymore");
+
+    ACTIONS_UNLOCK (comp);
+    return;
+  }
+
+  if (priv->actions) {
+    GValue params[1] = { G_VALUE_INIT };
+    GList *lact;
+
+    g_value_init (&params[0], G_TYPE_OBJECT);
+    g_value_set_object (&params[0], comp);
+
+    lact = priv->actions;
+    priv->actions = priv->actions->next;
+    ACTIONS_UNLOCK (comp);
+
+    GST_INFO_OBJECT (comp, "Invoking %p:%s",
+        lact->data, GST_DEBUG_FUNCPTR_NAME ((ACTION_CALLBACK (lact->data))));
+    g_closure_invoke (lact->data, NULL, 1, params, NULL);
+  } else {
+    ACTIONS_UNLOCK (comp);
+  }
 }
 
 static void
@@ -330,14 +375,15 @@ _start_task (NleComposition * comp)
 {
   GstTask *task;
 
+  ACTIONS_LOCK (comp);
   comp->priv->running = TRUE;
+  ACTIONS_UNLOCK (comp);
 
   GST_OBJECT_LOCK (comp);
 
   task = comp->task;
   if (task == NULL) {
-    task =
-        gst_task_new ((GstTaskFunction) iterate_main_context_func, comp, NULL);
+    task = gst_task_new ((GstTaskFunction) _execute_actions, comp, NULL);
     gst_task_set_lock (task, GET_TASK_LOCK (comp));
     GST_INFO_OBJECT (comp, "created task %p", task);
     comp->task = task;
@@ -355,10 +401,12 @@ _stop_task (NleComposition * comp)
 
   GST_INFO_OBJECT (comp, "Stoping children management task");
 
+  ACTIONS_LOCK (comp);
   comp->priv->running = FALSE;
 
-  /*  Clean the stack of GSource set on the MainContext */
-  g_main_context_wakeup (comp->priv->mcontext);
+  /*  Make sure we do not stay blocked trying to execute an action */
+  SIGNAL_NEW_ACTION (comp);
+  ACTIONS_UNLOCK (comp);
 
   GST_DEBUG_OBJECT (comp, "stop task");
 
@@ -430,21 +478,14 @@ _post_start_composition_update_done (NleComposition * comp,
 }
 
 static void
-_free_seek_data (SeekData * seekd)
-{
-  gst_event_unref (seekd->event);
-  g_slice_free (SeekData, seekd);
-}
-
-static gboolean
-_seek_pipeline_func (SeekData * seekd)
+_seek_pipeline_func (NleComposition * comp, SeekData * seekd)
 {
   gdouble rate;
   GstFormat format;
   GstSeekFlags flags;
   GstSeekType cur_type, stop_type;
   gint64 cur, stop;
-  NleCompositionPrivate *priv = seekd->comp->priv;
+  NleCompositionPrivate *priv = comp->priv;
 
   gst_event_parse_seek (seekd->event, &rate, &format, &flags,
       &cur_type, &cur, &stop_type, &stop);
@@ -467,7 +508,7 @@ _seek_pipeline_func (SeekData * seekd)
         " Not seeking", GST_TIME_ARGS (priv->segment->start),
         GST_TIME_ARGS (NLE_OBJECT_STOP (seekd->comp)));
     GST_FIXME_OBJECT (seekd->comp, "HANDLE error async!");
-    goto beach;
+    return;
   }
 
   _post_start_composition_update (seekd->comp,
@@ -488,51 +529,6 @@ _seek_pipeline_func (SeekData * seekd)
 
   _post_start_composition_update_done (seekd->comp,
       gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
-
-beach:
-  return G_SOURCE_REMOVE;
-}
-
-static void
-_add_gsource (NleComposition * comp, GSourceFunc func,
-    gpointer data, GDestroyNotify destroy, gint priority)
-{
-  GSource *source;
-  NleCompositionPrivate *priv = comp->priv;
-
-  GST_INFO_OBJECT (comp, "Adding GSource for function: %s",
-      GST_DEBUG_FUNCPTR_NAME (func));
-
-  MAIN_CONTEXT_LOCK (comp);
-  source = g_idle_source_new ();
-  g_source_set_callback (source, func, data, destroy);
-  g_source_set_priority (source, priority);
-
-  if (func == (GSourceFunc) _update_pipeline_func)
-    priv->update_gsources = g_list_prepend (priv->update_gsources, source);
-  else if (func == (GSourceFunc) _seek_pipeline_func)
-    priv->seek_gsources = g_list_prepend (priv->seek_gsources, source);
-  else
-    priv->gsources = g_list_prepend (priv->gsources, source);
-
-  g_source_attach (source, priv->mcontext);
-  MAIN_CONTEXT_UNLOCK (comp);
-}
-
-static void
-_add_seek_gsource (NleComposition * comp, GstEvent * event)
-{
-  SeekData *seekd = g_slice_new0 (SeekData);
-
-  GST_DEBUG_OBJECT (comp, "Adding GSource");
-
-  seekd->comp = comp;
-  seekd->event = event;
-
-  comp->priv->next_eos_seqnum = 0;
-  comp->priv->real_eos_seqnum = 0;
-  _add_gsource (comp, (GSourceFunc) _seek_pipeline_func, seekd,
-      (GDestroyNotify) _free_seek_data, G_PRIORITY_DEFAULT);
 }
 
 /*  Must be called with OBJECTS_LOCK taken */
@@ -590,7 +586,6 @@ _commit_all_values (NleComposition * comp)
   NleCompositionPrivate *priv = comp->priv;
 
   priv->next_base_time = 0;
-  GST_ERROR_OBJECT (comp, "Commiting state");
 
   _process_pending_entries (comp);
 
@@ -609,9 +604,8 @@ _commit_all_values (NleComposition * comp)
 }
 
 static gboolean
-_initialize_stack_func (UpdateCompositionData * ucompo)
+_initialize_stack_func (NleComposition * comp, UpdateCompositionData * ucompo)
 {
-  NleComposition *comp = ucompo->comp;
   NleCompositionPrivate *priv = comp->priv;
 
   _commit_all_values (comp);
@@ -631,15 +625,8 @@ _initialize_stack_func (UpdateCompositionData * ucompo)
 }
 
 static void
-_free_child_io_data (gpointer childio)
+_remove_object_func (NleComposition * comp, ChildIOData * childio)
 {
-  g_slice_free (ChildIOData, childio);
-}
-
-static gboolean
-_remove_object_func (ChildIOData * childio)
-{
-  NleComposition *comp = childio->comp;
   NleObject *object = childio->object;
 
   NleCompositionPrivate *priv = comp->priv;
@@ -653,45 +640,44 @@ _remove_object_func (ChildIOData * childio)
           " for addition, removing it from the addition list", object);
 
       g_hash_table_remove (priv->pending_io, object);
-      return G_SOURCE_REMOVE;
+      return;
     }
 
     GST_ERROR_OBJECT (comp, "Object %" GST_PTR_FORMAT " is "
         " not in the composition", object);
 
-    return G_SOURCE_REMOVE;
+    return;
   }
 
   if (in_pending_io) {
     GST_WARNING_OBJECT (comp, "Object %" GST_PTR_FORMAT " is already marked"
         " for removal", object);
 
-    return G_SOURCE_REMOVE;
+    return;
   }
 
   g_hash_table_add (priv->pending_io, object);
 
-  return G_SOURCE_REMOVE;
+  return;
 }
 
 static void
-_add_remove_object_gsource (NleComposition * comp, NleObject * object)
+_add_remove_object_action (NleComposition * comp, NleObject * object)
 {
   ChildIOData *childio = g_slice_new0 (ChildIOData);
 
-  GST_DEBUG_OBJECT (comp, "Adding GSource");
+  GST_DEBUG_OBJECT (comp, "Adding Action");
 
   childio->comp = comp;
   childio->object = object;
 
-  _add_gsource (comp, (GSourceFunc) _remove_object_func,
-      childio, _free_child_io_data, G_PRIORITY_DEFAULT);
+  _add_action (comp, G_CALLBACK (_remove_object_func),
+      childio, G_PRIORITY_DEFAULT);
 }
 
-static gboolean
-_add_object_func (ChildIOData * childio)
+static void
+_add_object_func (NleComposition * comp, ChildIOData * childio)
 {
-  NleComposition *comp = childio->comp;
   NleObject *object = childio->object;
   NleCompositionPrivate *priv = comp->priv;
   NleObject *in_pending_io;
@@ -702,34 +688,32 @@ _add_object_func (ChildIOData * childio)
     GST_ERROR_OBJECT (comp, "Object %" GST_PTR_FORMAT " is "
         " already in the composition", object);
 
-    return G_SOURCE_REMOVE;
+    return;
   }
 
   if (in_pending_io) {
     GST_WARNING_OBJECT (comp, "Object %" GST_PTR_FORMAT " is already marked"
         " for addition", object);
 
-    return G_SOURCE_REMOVE;
+    return;
   }
 
 
   g_hash_table_add (priv->pending_io, object);
-
-  return G_SOURCE_REMOVE;
 }
 
 static void
-_add_add_object_gsource (NleComposition * comp, NleObject * object)
+_add_add_object_action (NleComposition * comp, NleObject * object)
 {
   ChildIOData *childio = g_slice_new0 (ChildIOData);
 
-  GST_DEBUG_OBJECT (comp, "Adding GSource");
+  GST_DEBUG_OBJECT (comp, "Adding Action");
 
   childio->comp = comp;
   childio->object = object;
 
-  _add_gsource (comp, (GSourceFunc) _add_object_func, childio,
-      _free_child_io_data, G_PRIORITY_DEFAULT);
+  _add_action (comp, G_CALLBACK (_add_object_func), childio,
+      G_PRIORITY_DEFAULT);
 }
 
 static void
@@ -739,8 +723,82 @@ _free_update_compo_data (gpointer updatepipelinedata)
 }
 
 static void
-_add_update_compo_gsource (NleComposition * comp,
-    GSourceFunc func, NleUpdateStackReason reason)
+_free_action (Action * action, gpointer udata)
+{
+  if (ACTION_CALLBACK (action) == _seek_pipeline_func) {
+    SeekData *seekd = (SeekData *) udata;
+
+    gst_event_unref (seekd->event);
+    g_slice_free (SeekData, seekd);
+  } else if (ACTION_CALLBACK (action) == _remove_object_func ||
+      ACTION_CALLBACK (action) == _add_object_func) {
+    g_slice_free (ChildIOData, udata);
+  } else if (ACTION_CALLBACK (action) == _update_pipeline_func ||
+      ACTION_CALLBACK (action) == _commit_func ||
+      ACTION_CALLBACK (action) == _initialize_stack_func) {
+    g_slice_free (UpdateCompositionData, udata);
+  }
+}
+
+static void
+_add_action (NleComposition * comp, GCallback func,
+    gpointer data, gint priority)
+{
+  Action *action;
+  NleCompositionPrivate *priv = comp->priv;
+
+
+  action = (Action *) g_closure_new_simple (sizeof (Action), data);
+  g_closure_add_finalize_notifier ((GClosure *) action, data,
+      (GClosureNotify) _free_action);
+  ACTION_CALLBACK (action) = func;
+  action->priority = priority;
+  g_closure_set_marshal ((GClosure *) action, g_cclosure_marshal_VOID__VOID);
+
+  ACTIONS_LOCK (comp);
+  GST_INFO_OBJECT (comp, "Adding Action for function: %p:%s",
+      action, GST_DEBUG_FUNCPTR_NAME (func));
+
+  if (func == G_CALLBACK (_emit_commited_signal_func))
+    priv->actions = g_list_prepend (priv->actions, action);
+  else
+    priv->actions = g_list_append (priv->actions, action);
+
+  SIGNAL_NEW_ACTION (comp);
+  ACTIONS_UNLOCK (comp);
+}
+
+static void
+_add_seek_action (NleComposition * comp, GstEvent * event)
+{
+  SeekData *seekd = g_slice_new0 (SeekData);
+
+  GST_DEBUG_OBJECT (comp, "Adding Action");
+
+  seekd->comp = comp;
+  seekd->event = event;
+
+  comp->priv->next_eos_seqnum = 0;
+  comp->priv->real_eos_seqnum = 0;
+  _add_action (comp, G_CALLBACK (_seek_pipeline_func), seekd,
+      G_PRIORITY_DEFAULT);
+}
+
+static void
+_remove_update_actions (NleComposition * comp)
+{
+  _remove_actions_for_type (comp, G_CALLBACK (_update_pipeline_func));
+}
+
+static void
+_remove_seek_actions (NleComposition * comp)
+{
+  _remove_actions_for_type (comp, G_CALLBACK (_seek_pipeline_func));
+}
+
+static void
+_add_update_compo_action (NleComposition * comp,
+    GCallback callback, NleUpdateStackReason reason)
 {
   UpdateCompositionData *ucompo = g_slice_new0 (UpdateCompositionData);
 
@@ -751,8 +809,7 @@ _add_update_compo_gsource (NleComposition * comp,
   GST_INFO_OBJECT (comp, "Updating because: %s -- Setting seqnum: %i",
       UPDATE_PIPELINE_REASONS[reason], ucompo->seqnum);
 
-  _add_gsource (comp, (GSourceFunc) func, ucompo,
-      _free_update_compo_data, G_PRIORITY_DEFAULT);
+  _add_action (comp, callback, ucompo, G_PRIORITY_DEFAULT);
 }
 
 static void
@@ -876,9 +933,8 @@ nle_composition_init (NleComposition * comp)
 
   priv->objects_hash = g_hash_table_new (g_direct_hash, g_direct_equal);
 
-  priv->mcontext = g_main_context_new ();
-  g_main_context_set_dispatches_per_iteration (priv->mcontext, 1);
-  g_mutex_init (&priv->mcontext_lock);
+  g_mutex_init (&priv->actions_lock);
+  g_cond_init (&priv->actions_cond);
 
   priv->pending_io = g_hash_table_new (g_direct_hash, g_direct_equal);
 
@@ -958,7 +1014,8 @@ nle_composition_finalize (GObject * object)
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 
-  g_mutex_clear (&priv->mcontext_lock);
+  g_mutex_clear (&priv->actions_lock);
+  g_cond_clear (&priv->actions_cond);
 }
 
 /* signal_duration_change
@@ -1141,7 +1198,7 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
       }
 
       if (priv->next_eos_seqnum == seqnum)
-        _add_update_compo_gsource (comp, (GSourceFunc) _update_pipeline_func,
+        _add_update_compo_action (comp, G_CALLBACK (_update_pipeline_func),
             COMP_UPDATE_STACK_ON_EOS);
       else
         GST_INFO_OBJECT (comp,
@@ -1198,8 +1255,8 @@ have_to_update_pipeline (NleComposition * comp,
 static gboolean
 nle_composition_commit_func (NleObject * object, gboolean recurse)
 {
-  _add_update_compo_gsource (NLE_COMPOSITION (object),
-      (GSourceFunc) _commit_func, COMP_UPDATE_STACK_ON_COMMIT);
+  _add_update_compo_action (NLE_COMPOSITION (object),
+      G_CALLBACK (_commit_func), COMP_UPDATE_STACK_ON_COMMIT);
 
   return TRUE;
 }
@@ -1398,7 +1455,7 @@ seek_handling (NleComposition * comp, gint32 seqnum,
     gst_event_set_seqnum (toplevel_seek, seqnum);
     _set_real_eos_seqnum_from_seek (comp, toplevel_seek);
 
-    _remove_all_update_sources (comp);
+    _remove_update_actions (comp);
     _seek_current_stack (comp, toplevel_seek,
         _have_to_flush_downstream (update_stack_reason));
     update_operations_base_time (comp, !(comp->priv->segment->rate >= 0.0));
@@ -1421,7 +1478,7 @@ nle_composition_event_handler (GstPad * ghostpad, GstObject * parent,
     case GST_EVENT_SEEK:
     {
       if (!priv->seeking_itself) {
-        _add_seek_gsource (comp, event);
+        _add_seek_action (comp, event);
         event = NULL;
         GST_FIXME_OBJECT (comp, "HANDLE seeking errors!");
 
@@ -1996,14 +2053,12 @@ _set_current_bin_to_ready (NleComposition * comp, gboolean flush_downstream)
   comp->priv->tearing_down_stack = FALSE;
 }
 
-static gboolean
-_emit_commited_signal_func (NleComposition * comp)
+static void
+_emit_commited_signal_func (NleComposition * comp, gpointer udata)
 {
   GST_INFO_OBJECT (comp, "Emiting COMMITED now that the stack " "is ready");
 
   g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
-
-  return G_SOURCE_REMOVE;
 }
 
 static void
@@ -2015,7 +2070,7 @@ _restart_task (UpdateCompositionData * ucompo)
       UPDATE_PIPELINE_REASONS[ucompo->reason]);
 
   if (ucompo->reason == COMP_UPDATE_STACK_ON_COMMIT)
-    _add_gsource (comp, (GSourceFunc) _emit_commited_signal_func, comp, NULL,
+    _add_action (comp, G_CALLBACK (_emit_commited_signal_func), comp,
         G_PRIORITY_HIGH);
 
   comp->priv->awaited_caps_seqnum = 0;
@@ -2071,11 +2126,10 @@ _is_update_done_cb (GstPad * pad, GstPadProbeInfo * info,
   return GST_PAD_PROBE_OK;
 }
 
-static gboolean
-_commit_func (UpdateCompositionData * ucompo)
+static void
+_commit_func (NleComposition * comp, UpdateCompositionData * ucompo)
 {
   GstClockTime curpos;
-  NleComposition *comp = ucompo->comp;
   NleCompositionPrivate *priv = comp->priv;
 
   _post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
@@ -2090,7 +2144,7 @@ _commit_func (UpdateCompositionData * ucompo)
     g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, FALSE);
     _post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
 
-    return G_SOURCE_REMOVE;
+    return;
   }
 
   if (priv->initialized == FALSE) {
@@ -2115,15 +2169,12 @@ _commit_func (UpdateCompositionData * ucompo)
   }
 
   _post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
-
-  return G_SOURCE_REMOVE;
 }
 
-static gboolean
-_update_pipeline_func (UpdateCompositionData * ucompo)
+static void
+_update_pipeline_func (NleComposition * comp, UpdateCompositionData * ucompo)
 {
   gboolean reverse;
-  NleComposition *comp = ucompo->comp;
   NleCompositionPrivate *priv = comp->priv;
 
   _post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
@@ -2163,8 +2214,6 @@ _update_pipeline_func (UpdateCompositionData * ucompo)
   }
 
   _post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
-
-  return G_SOURCE_REMOVE;
 }
 
 static void
@@ -2203,14 +2252,14 @@ nle_composition_change_state (GstElement * element, GstStateChange transition)
       GST_DEBUG_OBJECT (comp,
           "Setting all children to READY and locking their state");
 
-      _add_update_compo_gsource (comp, (GSourceFunc) _initialize_stack_func,
+      _add_update_compo_action (comp, G_CALLBACK (_initialize_stack_func),
           COMP_UPDATE_STACK_INITIALIZE);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       _stop_task (comp);
 
-      _remove_all_update_sources (comp);
-      _remove_all_seek_sources (comp);
+      _remove_update_actions (comp);
+      _remove_seek_actions (comp);
       _set_all_children_state (comp, GST_STATE_READY);
       nle_composition_reset (comp);
 
@@ -2219,8 +2268,8 @@ nle_composition_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_READY_TO_NULL:
       _stop_task (comp);
 
-      _remove_all_update_sources (comp);
-      _remove_all_seek_sources (comp);
+      _remove_update_actions (comp);
+      _remove_seek_actions (comp);
       _set_all_children_state (comp, GST_STATE_NULL);
       break;
     default:
@@ -2633,9 +2682,9 @@ _activate_new_stack (NleComposition * comp)
 
 resync_state:
   gst_element_set_locked_state (priv->current_bin, FALSE);
-  GST_ERROR ("going back to parent state");
+  GST_DEBUG ("going back to parent state");
   gst_element_sync_state_with_parent (priv->current_bin);
-  GST_ERROR ("gone back to parent state");
+  GST_DEBUG ("gone back to parent state");
 
   return TRUE;
 }
@@ -2770,7 +2819,7 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum,
   gst_event_set_seqnum (toplevel_seek, seqnum);
   _set_real_eos_seqnum_from_seek (comp, toplevel_seek);
 
-  _remove_all_update_sources (comp);
+  _remove_update_actions (comp);
 
   /* If stacks are different, unlink/relink objects */
   if (!samestack) {
@@ -2836,7 +2885,7 @@ nle_composition_add_object (GstBin * bin, GstElement * element)
   object = NLE_OBJECT (element);
 
   object->in_composition = TRUE;
-  _add_add_object_gsource (comp, object);
+  _add_add_object_action (comp, object);
 
   return TRUE;
 }
@@ -2942,7 +2991,7 @@ nle_composition_remove_object (GstBin * bin, GstElement * element)
   object = NLE_OBJECT (element);
 
   object->in_composition = FALSE;
-  _add_remove_object_gsource (comp, object);
+  _add_remove_object_action (comp, object);
 
   return TRUE;
 }