composition: Use a GstPad task to run the update pipeline thread
authorThibault Saunier <tsaunier@gnome.org>
Fri, 27 Jun 2014 10:15:10 +0000 (12:15 +0200)
committerThibault Saunier <tsaunier@gnome.org>
Fri, 31 Oct 2014 10:58:07 +0000 (11:58 +0100)
gnl/gnlcomposition.c
tests/check/gnl/simple.c

index c062581..091b359 100644 (file)
@@ -1,6 +1,8 @@
 /* GStreamer
  * Copyright (C) 2001 Wim Taymans <wim.taymans@gmail.com>
  *               2004-2008 Edward Hervey <bilboed@bilboed.com>
+ *               2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
+ *               2014 Thibault Saunier <tsaunier@gnome.org>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -133,9 +135,12 @@ struct _GnlCompositionPrivate
   GstPadEventFunction gnl_event_pad_func;
   gboolean send_stream_start;
 
-  GThread *update_pipeline_thread;
-  GCond update_pipeline_cond;
-  GMutex update_pipeline_mutex;
+  GMainContext *mcontext;
+  /* Ensure that when we remove all sources from the maincontext
+   * we can not add any source, avoiding:
+   * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */
+  GMutex mcontext_lock;
+
 
   gboolean reset_time;
 
@@ -193,6 +198,7 @@ gnl_composition_event_handler (GstPad * ghostpad, GstObject * parent,
 static void
 compare_relink_single_node (GnlComposition * comp, GNode * node,
     GNode * oldstack);
+static gboolean update_pipeline_func (GnlComposition * comp);
 
 
 /* COMP_REAL_START: actual position to start current playback at. */
@@ -236,22 +242,19 @@ compare_relink_single_node (GnlComposition * comp, GNode * node,
     g_mutex_unlock (&comp->priv->flushing_lock);                               \
   } G_STMT_END
 
-#define WAIT_FOR_UPDATE_PIPELINE(comp)   G_STMT_START {                        \
-  GST_INFO_OBJECT (comp, "waiting for EOS from thread %p",                     \
-        g_thread_self());                                                      \
-  g_mutex_lock(&(comp->priv->update_pipeline_mutex));                          \
-  g_cond_wait(&(comp->priv->update_pipeline_cond),                             \
-      &(comp->priv->update_pipeline_mutex));                                   \
-  g_mutex_unlock(&(comp->priv->update_pipeline_mutex));                        \
-  } G_STMT_END
+#define MAIN_CONTEXT_LOCK(comp) G_STMT_START {                       \
+  GST_LOG_OBJECT (comp, "Getting MAIN_CONTEXT_LOCK in thread %p",    \
+        g_thread_self());                                            \
+  g_mutex_lock(&((GnlComposition*)comp)->priv->mcontext_lock);    \
+  GST_LOG_OBJECT (comp, "Got MAIN_CONTEXT_LOCK in thread %p",        \
+        g_thread_self());                                            \
+} G_STMT_END
 
-#define SIGNAL_UPDATE_PIPELINE(comp) {                                         \
-  GST_INFO_OBJECT (comp, "signaling EOS from thread %p",                       \
-        g_thread_self());                                                      \
-  g_mutex_lock(&(comp->priv->update_pipeline_mutex));                          \
-  g_cond_signal(&(comp->priv->update_pipeline_cond));                          \
-  g_mutex_unlock(&(comp->priv->update_pipeline_mutex));                        \
-  } G_STMT_END
+#define MAIN_CONTEXT_UNLOCK(comp) G_STMT_START {                     \
+  g_mutex_unlock(&((GnlComposition*)comp)->priv->mcontext_lock);  \
+  GST_LOG_OBJECT (comp, "Unlocked MAIN_CONTEXT_LOCK in thread %p",   \
+        g_thread_self());                                            \
+} G_STMT_END
 
 
 
@@ -268,6 +271,109 @@ struct _GnlCompositionEntry
 };
 
 static void
+_remove_all_sources (GnlComposition * comp)
+{
+  GSource *source;
+
+  MAIN_CONTEXT_LOCK (comp);
+  while ((source =
+          g_main_context_find_source_by_user_data (comp->priv->mcontext,
+              comp))) {
+    g_source_destroy (source);
+  }
+  MAIN_CONTEXT_UNLOCK (comp);
+}
+
+static void
+iterate_main_context_func (GnlComposition * comp)
+{
+  if (comp->priv->running == FALSE) {
+    GST_DEBUG_OBJECT (comp, "Not running anymore");
+
+    return;
+  }
+
+  g_main_context_iteration (comp->priv->mcontext, TRUE);
+}
+
+static void
+_start_srcpad_task (GnlComposition * comp)
+{
+  GST_ERROR_OBJECT (comp, "Starting srcpad task");
+
+  comp->priv->running = TRUE;
+  gst_pad_start_task (GST_PAD (GNL_OBJECT_SRC (comp)),
+      (GstTaskFunction) iterate_main_context_func, comp, NULL);
+}
+
+static gboolean
+_stop_srcpad_task (GnlComposition * comp, GstEvent * flush_start)
+{
+  gboolean res = TRUE;
+  GnlObject *obj = GNL_OBJECT (comp);
+
+  GST_ERROR_OBJECT (comp, "%s srcpad task",
+      flush_start ? "Pausing" : "Stopping");
+
+  comp->priv->running = FALSE;
+
+  /*  Clean the stack of GSource set on the MainContext */
+  g_main_context_wakeup (comp->priv->mcontext);
+  _remove_all_sources (comp);
+  if (flush_start) {
+    res = gst_pad_push_event (obj->srcpad, flush_start);
+  }
+
+  gst_pad_stop_task (obj->srcpad);
+
+  return res;
+}
+
+static gboolean
+src_activate_mode (GstPad * pad,
+    GstObject * parent, GstPadMode mode, gboolean active)
+{
+  GnlComposition *comp = GNL_COMPOSITION (parent);
+
+  if (gst_ghost_pad_activate_mode_default (pad, parent, mode, active) == FALSE) {
+    GST_WARNING_OBJECT (pad, "Could not activate ghost pad");
+    return FALSE;
+  }
+
+  GST_ERROR ("ACTIVATING SRCPAD TASK %i", active);
+  if (active == TRUE) {
+    switch (mode) {
+      case GST_PAD_MODE_PUSH:
+      {
+        GST_INFO_OBJECT (pad, "Activating pad!");
+        _start_srcpad_task (comp);
+        return TRUE;
+      }
+      default:
+      {
+        GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
+        return FALSE;
+      }
+    }
+  }
+
+  /* deactivating */
+  GST_INFO_OBJECT (comp, "Deactivating srcpad");
+  _stop_srcpad_task (comp, FALSE);
+
+  return TRUE;
+}
+
+static void
+_add_update_gsource (GnlComposition * comp)
+{
+  MAIN_CONTEXT_LOCK (comp);
+  g_main_context_invoke (comp->priv->mcontext,
+      (GSourceFunc) update_pipeline_func, comp);
+  MAIN_CONTEXT_UNLOCK (comp);
+}
+
+static void
 gnl_composition_class_init (GnlCompositionClass * klass)
 {
   GObjectClass *gobject_class;
@@ -401,6 +507,8 @@ gnl_composition_init (GnlComposition * comp)
       g_direct_equal, NULL, (GDestroyNotify) hash_value_destroy);
 
   priv->deactivated_elements_state = GST_STATE_READY;
+  priv->mcontext = g_main_context_new ();
+  g_mutex_init (&priv->mcontext_lock);
 
   comp->priv = priv;
 
@@ -409,6 +517,8 @@ gnl_composition_init (GnlComposition * comp)
   priv->gnl_event_pad_func = GST_PAD_EVENTFUNC (GNL_OBJECT_SRC (comp));
   gst_pad_set_event_function (GNL_OBJECT_SRC (comp),
       GST_DEBUG_FUNCPTR (gnl_composition_event_handler));
+  gst_pad_set_activatemode_function (GNL_OBJECT_SRC (comp),
+      GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) src_activate_mode));
 }
 
 static void
@@ -698,6 +808,7 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
       gboolean reverse = (comp->priv->segment->rate < 0);
       gboolean should_check_objects = FALSE;
 
+      GST_ERROR ("EOS");
       COMP_FLUSHING_LOCK (comp);
       if (priv->flushing) {
         GST_DEBUG_OBJECT (comp, "flushing, bailing out");
@@ -736,7 +847,7 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
         return GST_PAD_PROBE_OK;
       }
 
-      SIGNAL_UPDATE_PIPELINE (comp);
+      _add_update_gsource (comp);
 
       retval = GST_PAD_PROBE_DROP;
     }
@@ -1747,59 +1858,55 @@ set_child_caps (GValue * item, GValue * ret G_GNUC_UNUSED, GnlObject * comp)
   return TRUE;
 }
 
-static gpointer
+static gboolean
 update_pipeline_func (GnlComposition * comp)
 {
-  while (comp->priv->running) {
-    GnlCompositionPrivate *priv;
-    gboolean reverse;
-
-    WAIT_FOR_UPDATE_PIPELINE (comp);
-
-    /* Set up a non-initial seek on segment_stop */
-    priv = comp->priv;
-    reverse = (priv->segment->rate < 0.0);
-    if (!reverse) {
-      GST_DEBUG_OBJECT (comp,
-          "Setting segment->start to segment_stop:%" GST_TIME_FORMAT,
-          GST_TIME_ARGS (priv->segment_stop));
-      priv->segment->start = priv->segment_stop;
-    } else {
-      GST_DEBUG_OBJECT (comp,
-          "Setting segment->stop to segment_start:%" GST_TIME_FORMAT,
-          GST_TIME_ARGS (priv->segment_start));
-      priv->segment->stop = priv->segment_start;
-    }
-
-    seek_handling (comp, TRUE, TRUE);
+  GnlCompositionPrivate *priv;
+  gboolean reverse;
 
-    if (!priv->current) {
-      /* If we're at the end, post SEGMENT_DONE, or push EOS */
-      GST_DEBUG_OBJECT (comp, "Nothing else to play");
+  /* Set up a non-initial seek on segment_stop */
+  priv = comp->priv;
+  reverse = (priv->segment->rate < 0.0);
+  if (!reverse) {
+    GST_DEBUG_OBJECT (comp,
+        "Setting segment->start to segment_stop:%" GST_TIME_FORMAT,
+        GST_TIME_ARGS (priv->segment_stop));
+    priv->segment->start = priv->segment_stop;
+  } else {
+    GST_DEBUG_OBJECT (comp,
+        "Setting segment->stop to segment_start:%" GST_TIME_FORMAT,
+        GST_TIME_ARGS (priv->segment_start));
+    priv->segment->stop = priv->segment_start;
+  }
 
-      if (!(priv->segment->flags & GST_SEEK_FLAG_SEGMENT)) {
-        GST_DEBUG_OBJECT (comp, "Real EOS should be sent now");
-      } else if (priv->segment->flags & GST_SEEK_FLAG_SEGMENT) {
-        gint64 epos;
+  seek_handling (comp, TRUE, TRUE);
 
-        if (GST_CLOCK_TIME_IS_VALID (priv->segment->stop))
-          epos = (MIN (priv->segment->stop, GNL_OBJECT_STOP (comp)));
-        else
-          epos = GNL_OBJECT_STOP (comp);
-
-        GST_LOG_OBJECT (comp, "Emitting segment done pos %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (epos));
-        gst_element_post_message (GST_ELEMENT_CAST (comp),
-            gst_message_new_segment_done (GST_OBJECT (comp),
-                priv->segment->format, epos));
-        gst_pad_push_event (GNL_OBJECT (comp)->srcpad,
-            gst_event_new_segment_done (priv->segment->format, epos));
-      }
+  if (!priv->current) {
+    /* If we're at the end, post SEGMENT_DONE, or push EOS */
+    GST_DEBUG_OBJECT (comp, "Nothing else to play");
+
+    if (!(priv->segment->flags & GST_SEEK_FLAG_SEGMENT)) {
+      GST_DEBUG_OBJECT (comp, "Real EOS should be sent now");
+    } else if (priv->segment->flags & GST_SEEK_FLAG_SEGMENT) {
+      gint64 epos;
+
+      if (GST_CLOCK_TIME_IS_VALID (priv->segment->stop))
+        epos = (MIN (priv->segment->stop, GNL_OBJECT_STOP (comp)));
+      else
+        epos = GNL_OBJECT_STOP (comp);
+
+      GST_LOG_OBJECT (comp, "Emitting segment done pos %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (epos));
+      gst_element_post_message (GST_ELEMENT_CAST (comp),
+          gst_message_new_segment_done (GST_OBJECT (comp),
+              priv->segment->format, epos));
+      gst_pad_push_event (GNL_OBJECT (comp)->srcpad,
+          gst_event_new_segment_done (priv->segment->format, epos));
     }
-
   }
 
-  return NULL;
+
+  return G_SOURCE_REMOVE;
 }
 
 static GstStateChangeReturn
@@ -1813,12 +1920,6 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition)
       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
 
   switch (transition) {
-    case GST_STATE_CHANGE_NULL_TO_READY:
-      comp->priv->running = TRUE;
-      comp->priv->update_pipeline_thread =
-          g_thread_new ("update_pipeline_thread",
-          (GThreadFunc) update_pipeline_func, comp);
-      break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
     {
       GstIterator *children;
@@ -1863,13 +1964,9 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition)
     }
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gnl_composition_reset (comp);
-      break;
+      /* Fallthrough */
     case GST_STATE_CHANGE_READY_TO_NULL:
       gnl_composition_reset (comp);
-      comp->priv->running = FALSE;
-      SIGNAL_UPDATE_PIPELINE (comp);
-      g_thread_join (comp->priv->update_pipeline_thread);
       break;
     default:
       break;
@@ -2420,7 +2517,7 @@ _activate_new_stack (GnlComposition * comp, gboolean forcing_flush)
       priv->segment_stop = GST_CLOCK_TIME_NONE;
     }
 
-    GST_INFO_OBJECT (comp, "Nothing else in the composition"
+    GST_ERROR_OBJECT (comp, "Nothing else in the composition"
         ", update 'worked'");
     return TRUE;
   }
@@ -2436,11 +2533,11 @@ _activate_new_stack (GnlComposition * comp, gboolean forcing_flush)
   pad = GNL_OBJECT_SRC (topelement);
   topentry = COMP_ENTRY (comp, topelement);
 
-  GST_DEBUG_OBJECT (comp,
+  GST_ERROR_OBJECT (comp,
       "We have a valid toplevel element pad %s:%s", GST_DEBUG_PAD_NAME (pad));
 
   /* Send seek event */
-  GST_LOG_OBJECT (comp, "sending seek event");
+  GST_ERROR_OBJECT (comp, "sending seek event");
   if (gst_pad_send_event (pad, event)) {
     /* Unconditionnaly set the ghostpad target to pad */
     GST_LOG_OBJECT (comp,
@@ -2461,7 +2558,7 @@ _activate_new_stack (GnlComposition * comp, gboolean forcing_flush)
     return FALSE;
   }
 
-  GST_LOG_OBJECT (comp, "New stack activated!");
+  GST_ERROR_OBJECT (comp, "New stack activated!");
   return TRUE;
 }
 
index 47800a1..f64dd78 100644 (file)
@@ -59,7 +59,7 @@ test_simplest_full (void)
 
   bus = gst_element_get_bus (GST_ELEMENT (pipeline));
 
-  GST_DEBUG ("Setting pipeline to PLAYING");
+  GST_ERROR ("Setting pipeline to PLAYING");
   ASSERT_OBJECT_REFCOUNT (source1, "source1", 1);
 
   fail_if (gst_element_set_state (GST_ELEMENT (pipeline),
@@ -76,7 +76,7 @@ test_simplest_full (void)
 
   fail_if (collect->expected_segments != NULL);
 
-  GST_DEBUG ("Resetted pipeline to READY");
+  GST_ERROR ("Resetted pipeline to READY");
 
   /* Expected segments */
   collect->expected_segments = g_list_append (collect->expected_segments,
@@ -84,7 +84,7 @@ test_simplest_full (void)
   collect->expected_base = 0;
   collect->gotsegment = FALSE;
 
-  GST_DEBUG ("Setting pipeline to PLAYING again");
+  GST_ERROR ("Setting pipeline to PLAYING again");
 
   fail_if (gst_element_set_state (GST_ELEMENT (pipeline),
           GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE);