From 550aaf522dec2e66bd6e306a14ae7bdb6f872064 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Fri, 27 Jun 2014 12:15:10 +0200 Subject: [PATCH] composition: Use a GstPad task to run the update pipeline thread --- gnl/gnlcomposition.c | 255 ++++++++++++++++++++++++++++++++--------------- tests/check/gnl/simple.c | 6 +- 2 files changed, 179 insertions(+), 82 deletions(-) diff --git a/gnl/gnlcomposition.c b/gnl/gnlcomposition.c index c062581..091b359 100644 --- a/gnl/gnlcomposition.c +++ b/gnl/gnlcomposition.c @@ -1,6 +1,8 @@ /* GStreamer * Copyright (C) 2001 Wim Taymans * 2004-2008 Edward Hervey + * 2014 Mathieu Duponchelle + * 2014 Thibault Saunier * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -133,9 +135,12 @@ struct _GnlCompositionPrivate GstPadEventFunction gnl_event_pad_func; gboolean send_stream_start; - GThread *update_pipeline_thread; - GCond update_pipeline_cond; - GMutex update_pipeline_mutex; + GMainContext *mcontext; + /* Ensure that when we remove all sources from the maincontext + * we can not add any source, avoiding: + * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */ + GMutex mcontext_lock; + gboolean reset_time; @@ -193,6 +198,7 @@ gnl_composition_event_handler (GstPad * ghostpad, GstObject * parent, static void compare_relink_single_node (GnlComposition * comp, GNode * node, GNode * oldstack); +static gboolean update_pipeline_func (GnlComposition * comp); /* COMP_REAL_START: actual position to start current playback at. */ @@ -236,22 +242,19 @@ compare_relink_single_node (GnlComposition * comp, GNode * node, g_mutex_unlock (&comp->priv->flushing_lock); \ } G_STMT_END -#define WAIT_FOR_UPDATE_PIPELINE(comp) G_STMT_START { \ - GST_INFO_OBJECT (comp, "waiting for EOS from thread %p", \ - g_thread_self()); \ - g_mutex_lock(&(comp->priv->update_pipeline_mutex)); \ - g_cond_wait(&(comp->priv->update_pipeline_cond), \ - &(comp->priv->update_pipeline_mutex)); \ - g_mutex_unlock(&(comp->priv->update_pipeline_mutex)); \ - } G_STMT_END +#define MAIN_CONTEXT_LOCK(comp) G_STMT_START { \ + GST_LOG_OBJECT (comp, "Getting MAIN_CONTEXT_LOCK in thread %p", \ + g_thread_self()); \ + g_mutex_lock(&((GnlComposition*)comp)->priv->mcontext_lock); \ + GST_LOG_OBJECT (comp, "Got MAIN_CONTEXT_LOCK in thread %p", \ + g_thread_self()); \ +} G_STMT_END -#define SIGNAL_UPDATE_PIPELINE(comp) { \ - GST_INFO_OBJECT (comp, "signaling EOS from thread %p", \ - g_thread_self()); \ - g_mutex_lock(&(comp->priv->update_pipeline_mutex)); \ - g_cond_signal(&(comp->priv->update_pipeline_cond)); \ - g_mutex_unlock(&(comp->priv->update_pipeline_mutex)); \ - } G_STMT_END +#define MAIN_CONTEXT_UNLOCK(comp) G_STMT_START { \ + g_mutex_unlock(&((GnlComposition*)comp)->priv->mcontext_lock); \ + GST_LOG_OBJECT (comp, "Unlocked MAIN_CONTEXT_LOCK in thread %p", \ + g_thread_self()); \ +} G_STMT_END @@ -268,6 +271,109 @@ struct _GnlCompositionEntry }; static void +_remove_all_sources (GnlComposition * comp) +{ + GSource *source; + + MAIN_CONTEXT_LOCK (comp); + while ((source = + g_main_context_find_source_by_user_data (comp->priv->mcontext, + comp))) { + g_source_destroy (source); + } + MAIN_CONTEXT_UNLOCK (comp); +} + +static void +iterate_main_context_func (GnlComposition * comp) +{ + if (comp->priv->running == FALSE) { + GST_DEBUG_OBJECT (comp, "Not running anymore"); + + return; + } + + g_main_context_iteration (comp->priv->mcontext, TRUE); +} + +static void +_start_srcpad_task (GnlComposition * comp) +{ + GST_ERROR_OBJECT (comp, "Starting srcpad task"); + + comp->priv->running = TRUE; + gst_pad_start_task (GST_PAD (GNL_OBJECT_SRC (comp)), + (GstTaskFunction) iterate_main_context_func, comp, NULL); +} + +static gboolean +_stop_srcpad_task (GnlComposition * comp, GstEvent * flush_start) +{ + gboolean res = TRUE; + GnlObject *obj = GNL_OBJECT (comp); + + GST_ERROR_OBJECT (comp, "%s srcpad task", + flush_start ? "Pausing" : "Stopping"); + + comp->priv->running = FALSE; + + /* Clean the stack of GSource set on the MainContext */ + g_main_context_wakeup (comp->priv->mcontext); + _remove_all_sources (comp); + if (flush_start) { + res = gst_pad_push_event (obj->srcpad, flush_start); + } + + gst_pad_stop_task (obj->srcpad); + + return res; +} + +static gboolean +src_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active) +{ + GnlComposition *comp = GNL_COMPOSITION (parent); + + if (gst_ghost_pad_activate_mode_default (pad, parent, mode, active) == FALSE) { + GST_WARNING_OBJECT (pad, "Could not activate ghost pad"); + return FALSE; + } + + GST_ERROR ("ACTIVATING SRCPAD TASK %i", active); + if (active == TRUE) { + switch (mode) { + case GST_PAD_MODE_PUSH: + { + GST_INFO_OBJECT (pad, "Activating pad!"); + _start_srcpad_task (comp); + return TRUE; + } + default: + { + GST_ERROR_OBJECT (pad, "Only supported mode is PUSH"); + return FALSE; + } + } + } + + /* deactivating */ + GST_INFO_OBJECT (comp, "Deactivating srcpad"); + _stop_srcpad_task (comp, FALSE); + + return TRUE; +} + +static void +_add_update_gsource (GnlComposition * comp) +{ + MAIN_CONTEXT_LOCK (comp); + g_main_context_invoke (comp->priv->mcontext, + (GSourceFunc) update_pipeline_func, comp); + MAIN_CONTEXT_UNLOCK (comp); +} + +static void gnl_composition_class_init (GnlCompositionClass * klass) { GObjectClass *gobject_class; @@ -401,6 +507,8 @@ gnl_composition_init (GnlComposition * comp) g_direct_equal, NULL, (GDestroyNotify) hash_value_destroy); priv->deactivated_elements_state = GST_STATE_READY; + priv->mcontext = g_main_context_new (); + g_mutex_init (&priv->mcontext_lock); comp->priv = priv; @@ -409,6 +517,8 @@ gnl_composition_init (GnlComposition * comp) priv->gnl_event_pad_func = GST_PAD_EVENTFUNC (GNL_OBJECT_SRC (comp)); gst_pad_set_event_function (GNL_OBJECT_SRC (comp), GST_DEBUG_FUNCPTR (gnl_composition_event_handler)); + gst_pad_set_activatemode_function (GNL_OBJECT_SRC (comp), + GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) src_activate_mode)); } static void @@ -698,6 +808,7 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, gboolean reverse = (comp->priv->segment->rate < 0); gboolean should_check_objects = FALSE; + GST_ERROR ("EOS"); COMP_FLUSHING_LOCK (comp); if (priv->flushing) { GST_DEBUG_OBJECT (comp, "flushing, bailing out"); @@ -736,7 +847,7 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, return GST_PAD_PROBE_OK; } - SIGNAL_UPDATE_PIPELINE (comp); + _add_update_gsource (comp); retval = GST_PAD_PROBE_DROP; } @@ -1747,59 +1858,55 @@ set_child_caps (GValue * item, GValue * ret G_GNUC_UNUSED, GnlObject * comp) return TRUE; } -static gpointer +static gboolean update_pipeline_func (GnlComposition * comp) { - while (comp->priv->running) { - GnlCompositionPrivate *priv; - gboolean reverse; - - WAIT_FOR_UPDATE_PIPELINE (comp); - - /* Set up a non-initial seek on segment_stop */ - priv = comp->priv; - reverse = (priv->segment->rate < 0.0); - if (!reverse) { - GST_DEBUG_OBJECT (comp, - "Setting segment->start to segment_stop:%" GST_TIME_FORMAT, - GST_TIME_ARGS (priv->segment_stop)); - priv->segment->start = priv->segment_stop; - } else { - GST_DEBUG_OBJECT (comp, - "Setting segment->stop to segment_start:%" GST_TIME_FORMAT, - GST_TIME_ARGS (priv->segment_start)); - priv->segment->stop = priv->segment_start; - } - - seek_handling (comp, TRUE, TRUE); + GnlCompositionPrivate *priv; + gboolean reverse; - if (!priv->current) { - /* If we're at the end, post SEGMENT_DONE, or push EOS */ - GST_DEBUG_OBJECT (comp, "Nothing else to play"); + /* Set up a non-initial seek on segment_stop */ + priv = comp->priv; + reverse = (priv->segment->rate < 0.0); + if (!reverse) { + GST_DEBUG_OBJECT (comp, + "Setting segment->start to segment_stop:%" GST_TIME_FORMAT, + GST_TIME_ARGS (priv->segment_stop)); + priv->segment->start = priv->segment_stop; + } else { + GST_DEBUG_OBJECT (comp, + "Setting segment->stop to segment_start:%" GST_TIME_FORMAT, + GST_TIME_ARGS (priv->segment_start)); + priv->segment->stop = priv->segment_start; + } - if (!(priv->segment->flags & GST_SEEK_FLAG_SEGMENT)) { - GST_DEBUG_OBJECT (comp, "Real EOS should be sent now"); - } else if (priv->segment->flags & GST_SEEK_FLAG_SEGMENT) { - gint64 epos; + seek_handling (comp, TRUE, TRUE); - if (GST_CLOCK_TIME_IS_VALID (priv->segment->stop)) - epos = (MIN (priv->segment->stop, GNL_OBJECT_STOP (comp))); - else - epos = GNL_OBJECT_STOP (comp); - - GST_LOG_OBJECT (comp, "Emitting segment done pos %" GST_TIME_FORMAT, - GST_TIME_ARGS (epos)); - gst_element_post_message (GST_ELEMENT_CAST (comp), - gst_message_new_segment_done (GST_OBJECT (comp), - priv->segment->format, epos)); - gst_pad_push_event (GNL_OBJECT (comp)->srcpad, - gst_event_new_segment_done (priv->segment->format, epos)); - } + if (!priv->current) { + /* If we're at the end, post SEGMENT_DONE, or push EOS */ + GST_DEBUG_OBJECT (comp, "Nothing else to play"); + + if (!(priv->segment->flags & GST_SEEK_FLAG_SEGMENT)) { + GST_DEBUG_OBJECT (comp, "Real EOS should be sent now"); + } else if (priv->segment->flags & GST_SEEK_FLAG_SEGMENT) { + gint64 epos; + + if (GST_CLOCK_TIME_IS_VALID (priv->segment->stop)) + epos = (MIN (priv->segment->stop, GNL_OBJECT_STOP (comp))); + else + epos = GNL_OBJECT_STOP (comp); + + GST_LOG_OBJECT (comp, "Emitting segment done pos %" GST_TIME_FORMAT, + GST_TIME_ARGS (epos)); + gst_element_post_message (GST_ELEMENT_CAST (comp), + gst_message_new_segment_done (GST_OBJECT (comp), + priv->segment->format, epos)); + gst_pad_push_event (GNL_OBJECT (comp)->srcpad, + gst_event_new_segment_done (priv->segment->format, epos)); } - } - return NULL; + + return G_SOURCE_REMOVE; } static GstStateChangeReturn @@ -1813,12 +1920,6 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition) gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - comp->priv->running = TRUE; - comp->priv->update_pipeline_thread = - g_thread_new ("update_pipeline_thread", - (GThreadFunc) update_pipeline_func, comp); - break; case GST_STATE_CHANGE_READY_TO_PAUSED: { GstIterator *children; @@ -1863,13 +1964,9 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition) } break; case GST_STATE_CHANGE_PAUSED_TO_READY: - gnl_composition_reset (comp); - break; + /* Fallthrough */ case GST_STATE_CHANGE_READY_TO_NULL: gnl_composition_reset (comp); - comp->priv->running = FALSE; - SIGNAL_UPDATE_PIPELINE (comp); - g_thread_join (comp->priv->update_pipeline_thread); break; default: break; @@ -2420,7 +2517,7 @@ _activate_new_stack (GnlComposition * comp, gboolean forcing_flush) priv->segment_stop = GST_CLOCK_TIME_NONE; } - GST_INFO_OBJECT (comp, "Nothing else in the composition" + GST_ERROR_OBJECT (comp, "Nothing else in the composition" ", update 'worked'"); return TRUE; } @@ -2436,11 +2533,11 @@ _activate_new_stack (GnlComposition * comp, gboolean forcing_flush) pad = GNL_OBJECT_SRC (topelement); topentry = COMP_ENTRY (comp, topelement); - GST_DEBUG_OBJECT (comp, + GST_ERROR_OBJECT (comp, "We have a valid toplevel element pad %s:%s", GST_DEBUG_PAD_NAME (pad)); /* Send seek event */ - GST_LOG_OBJECT (comp, "sending seek event"); + GST_ERROR_OBJECT (comp, "sending seek event"); if (gst_pad_send_event (pad, event)) { /* Unconditionnaly set the ghostpad target to pad */ GST_LOG_OBJECT (comp, @@ -2461,7 +2558,7 @@ _activate_new_stack (GnlComposition * comp, gboolean forcing_flush) return FALSE; } - GST_LOG_OBJECT (comp, "New stack activated!"); + GST_ERROR_OBJECT (comp, "New stack activated!"); return TRUE; } diff --git a/tests/check/gnl/simple.c b/tests/check/gnl/simple.c index 47800a1..f64dd78 100644 --- a/tests/check/gnl/simple.c +++ b/tests/check/gnl/simple.c @@ -59,7 +59,7 @@ test_simplest_full (void) bus = gst_element_get_bus (GST_ELEMENT (pipeline)); - GST_DEBUG ("Setting pipeline to PLAYING"); + GST_ERROR ("Setting pipeline to PLAYING"); ASSERT_OBJECT_REFCOUNT (source1, "source1", 1); fail_if (gst_element_set_state (GST_ELEMENT (pipeline), @@ -76,7 +76,7 @@ test_simplest_full (void) fail_if (collect->expected_segments != NULL); - GST_DEBUG ("Resetted pipeline to READY"); + GST_ERROR ("Resetted pipeline to READY"); /* Expected segments */ collect->expected_segments = g_list_append (collect->expected_segments, @@ -84,7 +84,7 @@ test_simplest_full (void) collect->expected_base = 0; collect->gotsegment = FALSE; - GST_DEBUG ("Setting pipeline to PLAYING again"); + GST_ERROR ("Setting pipeline to PLAYING again"); fail_if (gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE); -- 2.7.4