composition: Implement the logic to PAUSE the task while executing actions
authorThibault Saunier <tsaunier@gnome.org>
Mon, 7 Jul 2014 21:07:15 +0000 (23:07 +0200)
committerThibault Saunier <tsaunier@gnome.org>
Fri, 31 Oct 2014 10:58:09 +0000 (11:58 +0100)
We need to wait for the pipeline update to be actually finished before we can start another
action. That means that we pause the task until one buffer from the new stack is
outputed.

Co-Authored by: Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>

gnl/gnlcomposition.c

index 32cb37f16e190af3e3a0ca5cc92761be34313b5f..d5b2b61c6b65515606caa0a8903475b9a6e4357a 100644 (file)
@@ -172,6 +172,12 @@ struct _GnlCompositionPrivate
 
   gboolean seeking_itself;
   gint real_eos_seqnum;
+
+  /* While we do not get a buffer on our srcpad,
+   * we are not commited */
+  gulong commited_probeid;
+  /* 0 means that we already received the right segment */
+  gint awaited_segment_seqnum;
 };
 
 static guint _signals[LAST_SIGNAL] = { 0 };
@@ -222,6 +228,10 @@ static void _relink_single_node (GnlComposition * comp, GNode * node,
     GstEvent * toplevel_seek);
 static gboolean update_pipeline_func (GnlComposition * comp);
 static gboolean _commit_func (GnlComposition * comp);
+static gboolean lock_child_state (GValue * item, GValue * ret,
+    gpointer udata G_GNUC_UNUSED);
+static gboolean
+set_child_caps (GValue * item, GValue * ret G_GNUC_UNUSED, GnlObject * comp);
 static GstEvent *get_new_seek_event (GnlComposition * comp, gboolean initial,
     gboolean updatestoponly);
 static gboolean
@@ -298,6 +308,12 @@ struct _GnlCompositionEntry
 {
   GnlObject *object;
   GnlComposition *comp;
+
+  /* handler id for block probe */
+  gulong probeid;
+  gulong dataprobeid;
+
+  gboolean seeked;
 };
 
 static void
@@ -774,6 +790,20 @@ gnl_composition_class_init (GnlCompositionClass * klass)
 static void
 hash_value_destroy (GnlCompositionEntry * entry)
 {
+  GstPad *srcpad;
+  GstElement *element = GST_ELEMENT (entry->object);
+
+  srcpad = GNL_OBJECT_SRC (element);
+  if (entry->probeid) {
+    gst_pad_remove_probe (srcpad, entry->probeid);
+    entry->probeid = 0;
+  }
+
+  if (entry->dataprobeid) {
+    gst_pad_remove_probe (srcpad, entry->dataprobeid);
+    entry->dataprobeid = 0;
+  }
+
   g_slice_free (GnlCompositionEntry, entry);
 }
 
@@ -947,6 +977,42 @@ signal_duration_change (GnlComposition * comp)
       gst_message_new_duration_changed (GST_OBJECT_CAST (comp)));
 }
 
+static gboolean
+unblock_child_pads (GValue * item, GValue * ret G_GNUC_UNUSED,
+    GnlComposition * comp)
+{
+  GstPad *pad;
+  GstElement *child = g_value_get_object (item);
+  GnlCompositionEntry *entry = COMP_ENTRY (comp, child);
+
+  GST_DEBUG_OBJECT (child, "unblocking pads");
+
+  pad = GNL_OBJECT_SRC (child);
+  if (entry->probeid) {
+    gst_pad_remove_probe (pad, entry->probeid);
+    entry->probeid = 0;
+  }
+  return TRUE;
+}
+
+static void
+unblock_children (GnlComposition * comp)
+{
+  GstIterator *children;
+
+  children = gst_bin_iterate_elements (GST_BIN (comp->priv->current_bin));
+
+retry:
+  if (G_UNLIKELY (gst_iterator_fold (children,
+              (GstIteratorFoldFunction) unblock_child_pads, NULL,
+              comp) == GST_ITERATOR_RESYNC)) {
+    gst_iterator_resync (children);
+    goto retry;
+  }
+  gst_iterator_free (children);
+}
+
+
 static gboolean
 reset_child (GValue * item, GValue * ret G_GNUC_UNUSED, gpointer user_data)
 {
@@ -956,6 +1022,9 @@ reset_child (GValue * item, GValue * ret G_GNUC_UNUSED, gpointer user_data)
   GnlObject *object;
   GstPad *srcpad, *peerpad;
 
+  GST_DEBUG_OBJECT (child, "unlocking state");
+  gst_element_set_locked_state (child, FALSE);
+
   entry = COMP_ENTRY (comp, child);
   object = entry->object;
   srcpad = object->srcpad;
@@ -968,6 +1037,18 @@ reset_child (GValue * item, GValue * ret G_GNUC_UNUSED, gpointer user_data)
   return TRUE;
 }
 
+static gboolean
+lock_child_state (GValue * item, GValue * ret G_GNUC_UNUSED,
+    gpointer udata G_GNUC_UNUSED)
+{
+  GstElement *child = g_value_get_object (item);
+
+  GST_DEBUG_OBJECT (child, "locking state");
+  gst_element_set_locked_state (child, TRUE);
+
+  return TRUE;
+}
+
 static void
 reset_children (GnlComposition * comp)
 {
@@ -1953,6 +2034,16 @@ get_clean_toplevel_stack (GnlComposition * comp, GstClockTime * timestamp,
 }
 
 
+static gboolean
+set_child_caps (GValue * item, GValue * ret G_GNUC_UNUSED, GnlObject * comp)
+{
+  GstElement *child = g_value_get_object (item);
+
+  gnl_object_set_caps ((GnlObject *) child, comp->caps);
+
+  return TRUE;
+}
+
 /*  Must be called with OBJECTS_LOCK taken */
 static void
 _set_current_bin_to_ready (GnlComposition * comp)
@@ -2009,6 +2100,82 @@ _process_pending_entries (GnlComposition * comp)
   g_hash_table_remove_all (priv->pending_io);
 }
 
+static gboolean
+_emit_commited_signal_func (GnlComposition * comp)
+{
+  GST_INFO_OBJECT (comp, "Emiting COMMITED now that the stack "
+      "is ready");
+
+  g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
+
+  return G_SOURCE_REMOVE;
+}
+
+static GstPadProbeReturn
+_add_emit_commited_and_restart_task (GnlComposition * comp)
+{
+  GST_ERROR_OBJECT (comp, "Setup commit and restart task!");
+
+  MAIN_CONTEXT_LOCK (comp);
+  g_main_context_invoke_full (comp->priv->mcontext, G_PRIORITY_HIGH,
+      (GSourceFunc) _emit_commited_signal_func, comp, NULL);
+  MAIN_CONTEXT_UNLOCK (comp);
+
+
+  comp->priv->awaited_segment_seqnum = 0;
+  comp->priv->commited_probeid = 0;
+
+  gst_task_start (comp->task);
+
+  return GST_PAD_PROBE_REMOVE;
+}
+
+static GstPadProbeReturn
+_commit_done_cb (GstPad * pad, GstPadProbeInfo * info, GnlComposition * comp)
+{
+  if (comp->priv->awaited_segment_seqnum) {
+    if (GST_IS_EVENT (info->data)) {
+      gint seqnum = gst_event_get_seqnum (info->data);
+
+      GST_DEBUG_OBJECT (comp, "Got event %s -- with seqnum: %i "
+          "(awaited_segment_seqnum: %i)",
+          GST_EVENT_TYPE_NAME (info->data), seqnum,
+          comp->priv->awaited_segment_seqnum);
+
+      if (seqnum == comp->priv->awaited_segment_seqnum) {
+
+        if (GST_EVENT_TYPE (info->data) == GST_EVENT_EOS) {
+          GST_INFO_OBJECT (comp, "Received EOS even before"
+              " receiving SEGMENT with proper seqnum -> we are done");
+
+          return _add_emit_commited_and_restart_task (comp);
+
+        } else if (GST_EVENT_TYPE (info->data) == GST_EVENT_SEGMENT) {
+
+          GST_INFO_OBJECT (comp, "Got segment event with right seqnum"
+              " now waiting for a buffer to restart playing with our "
+              " children");
+
+          comp->priv->awaited_segment_seqnum = 0;
+        }
+      }
+    }
+
+    return GST_PAD_PROBE_OK;
+  } else if (GST_IS_BUFFER (info->data)) {
+
+    GST_INFO_OBJECT (comp, "Got %" GST_PTR_FORMAT " concidering commit "
+        "as done", info->data);
+
+    return _add_emit_commited_and_restart_task (comp);
+  }
+
+  GST_INFO_OBJECT (comp, "Got %" GST_PTR_FORMAT " still waiting for a buffer",
+      info->data);
+
+  return GST_PAD_PROBE_OK;
+}
+
 static inline gboolean
 _commit_values (GnlComposition * comp)
 {
@@ -2033,6 +2200,8 @@ _commit_func (GnlComposition * comp)
   GstClockTime curpos;
   GnlCompositionPrivate *priv = comp->priv;
 
+  GST_INFO_OBJECT (comp, "Commiting state");
+
   COMP_OBJECTS_LOCK (comp);
 
   /* Get current so that it represent the duration it was
@@ -2043,7 +2212,7 @@ _commit_func (GnlComposition * comp)
 
   if (_commit_values (comp) == FALSE) {
     COMP_OBJECTS_UNLOCK (comp);
-    GST_ERROR_OBJECT (comp, "Nothing to commit, leaving");
+    GST_INFO_OBJECT (comp, "Nothing to commit, leaving");
 
     g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, FALSE);
 
@@ -2057,18 +2226,26 @@ _commit_func (GnlComposition * comp)
       (priv->objects_stop, (GCompareFunc) objects_stop_compare);
 
   if (priv->initialized == FALSE) {
+    GST_DEBUG_OBJECT (comp, "Not initialized yet, just updating values");
+
     update_start_stop_duration (comp);
+
+    g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
+
   } else {
     /* And update the pipeline at current position if needed */
 
     update_start_stop_duration (comp);
     update_pipeline (comp, curpos, TRUE, TRUE);
 
+    if (!priv->current) {
+      GST_INFO_OBJECT (comp, "No new stack set, we can go and keep acting on"
+          " our children");
+      g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
+    }
   }
   COMP_OBJECTS_UNLOCK (comp);
 
-  GST_ERROR ("emitted signal");
-  g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
   return G_SOURCE_REMOVE;
 }
 
@@ -2139,6 +2316,7 @@ _set_all_children_state (GnlComposition * comp, GstState state)
 static GstStateChangeReturn
 gnl_composition_change_state (GstElement * element, GstStateChange transition)
 {
+  GstIterator *children;
   GnlComposition *comp = (GnlComposition *) element;
   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
 
@@ -2149,6 +2327,32 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition)
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       gnl_composition_reset (comp);
+
+      /* state-lock all elements */
+      GST_DEBUG_OBJECT (comp,
+          "Setting all children to READY and locking their state");
+
+      children = gst_bin_iterate_elements (GST_BIN (comp->priv->current_bin));
+
+      while (G_UNLIKELY (gst_iterator_fold (children,
+                  (GstIteratorFoldFunction) lock_child_state, NULL,
+                  NULL) == GST_ITERATOR_RESYNC)) {
+        gst_iterator_resync (children);
+      }
+      gst_iterator_free (children);
+
+      /* Set caps on all objects */
+      if (G_UNLIKELY (!gst_caps_is_any (GNL_OBJECT (comp)->caps))) {
+        children = gst_bin_iterate_elements (GST_BIN (comp->priv->current_bin));
+
+        while (G_UNLIKELY (gst_iterator_fold (children,
+                    (GstIteratorFoldFunction) set_child_caps, NULL,
+                    comp) == GST_ITERATOR_RESYNC)) {
+          gst_iterator_resync (children);
+        }
+        gst_iterator_free (children);
+      }
+
       _add_initialize_stack_gsource (comp);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
@@ -2168,6 +2372,15 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition)
   if (ret == GST_STATE_CHANGE_FAILURE)
     return ret;
 
+  switch (transition) {
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+    case GST_STATE_CHANGE_READY_TO_NULL:
+      unblock_children (comp);
+      break;
+    default:
+      break;
+  }
+
   return ret;
 }
 
@@ -2464,20 +2677,44 @@ _deactivate_stack (GnlComposition * comp)
 
   if (ptarget)
     gst_object_unref (ptarget);
+
+/*   priv->current = NULL;
+ */
 }
 
 static void
 _relink_new_stack (GnlComposition * comp, GNode * stack,
     GstEvent * toplevel_seek)
 {
+  GnlCompositionPrivate *priv = comp->priv;
 
-
+  GST_ERROR ("RElinking new stack");
   _relink_single_node (comp, stack, toplevel_seek);
+
+  GST_ERROR ("Reseting seqnum to %i", gst_event_get_seqnum (toplevel_seek));
+  GNL_OBJECT (comp)->wanted_seqnum = gst_event_get_seqnum (toplevel_seek);
+
   gst_event_unref (toplevel_seek);
 
-  gst_element_set_locked_state (comp->priv->current_bin, FALSE);
-  gst_element_sync_state_with_parent (comp->priv->current_bin);
-}
+  gst_element_set_locked_state (priv->current_bin, FALSE);
+  gst_element_sync_state_with_parent (priv->current_bin);
+}
+
+/* static void
+ * unlock_activate_stack (GnlComposition * comp, GNode * node, GstState state)
+ * {
+ *   GNode *child;
+ * 
+ *   GST_LOG_OBJECT (comp, "object:%s",
+ *       GST_ELEMENT_NAME ((GstElement *) (node->data)));
+ * 
+ *   gst_element_set_locked_state ((GstElement *) (node->data), FALSE);
+ *   gst_element_set_state (GST_ELEMENT (node->data), state);
+ * 
+ *   for (child = node->children; child; child = child->next)
+ *     unlock_activate_stack (comp, child, state);
+ * }
+ */
 
 static gboolean
 are_same_stacks (GNode * stack1, GNode * stack2)
@@ -2678,8 +2915,8 @@ update_pipeline (GnlComposition * comp, GstClockTime currenttime,
   }
 
   toplevel_seek = get_new_seek_event (comp, TRUE, updatestoponly);
+  stack_seqnum = gst_event_get_seqnum (toplevel_seek);
   if (_is_last_stack (comp)) {
-    stack_seqnum = gst_event_get_seqnum (toplevel_seek);
     g_atomic_int_set (&priv->real_eos_seqnum, stack_seqnum);
 
     GST_ERROR_OBJECT (comp, "Seeting up last stack, seqnum is: %i",
@@ -2696,11 +2933,17 @@ update_pipeline (GnlComposition * comp, GstClockTime currenttime,
   GST_DEBUG_OBJECT (comp, "Setting current stack");
   priv->current = stack;
 
-  if (!samestack && stack) {
-    GST_DEBUG_OBJECT (comp, "activating objects in new stack to %s",
-        gst_element_state_get_name (nextstate));
-    unlock_activate_stack (comp, stack, nextstate);
-    GST_DEBUG_OBJECT (comp, "Finished activating objects in new stack");
+  if (priv->current) {
+    GST_INFO_OBJECT (comp, "New stack set and ready to run, probing src pad"
+        " and stopping children thread until we are actually ready with"
+        " that new stack");
+
+    comp->priv->awaited_segment_seqnum = stack_seqnum;
+    priv->commited_probeid = gst_pad_add_probe (GNL_OBJECT_SRC (comp),
+        GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+        (GstPadProbeCallback) _commit_done_cb, comp, NULL);
+
+    gst_task_pause (comp->task);
   }
 
   /* Activate stack */