GstPadEventFunction nle_event_pad_func;
gboolean send_stream_start;
- GMainContext *mcontext;
- /* Ensure that when we remove all sources from the maincontext
- * we can not add any source, avoiding:
- * "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */
- GMutex mcontext_lock;
- GList *gsources;
- GList *update_gsources;
- GList *seek_gsources;
+ /* Protect the actions list */
+ GMutex actions_lock;
+ GCond actions_cond;
+ GList *actions;
gboolean running;
gboolean initialized;
gboolean tearing_down_stack;
};
+typedef struct _Action
+{
+ GCClosure closure;
+ gint priority;
+} Action;
+
+#define ACTION_CALLBACK(__action) (((GCClosure*) (__action))->callback)
+
static guint _signals[LAST_SIGNAL] = { 0 };
static GParamSpec *nleobject_properties[NLEOBJECT_PROP_LAST];
GstEvent * event);
static void _relink_single_node (NleComposition * comp, GNode * node,
GstEvent * toplevel_seek);
-static gboolean _update_pipeline_func (UpdateCompositionData * ucompo);
-static gboolean _commit_func (UpdateCompositionData * ucompo);
+static void _update_pipeline_func (NleComposition * comp,
+ UpdateCompositionData * ucompo);
+static void _commit_func (NleComposition * comp,
+ UpdateCompositionData * ucompo);
static GstEvent *get_new_seek_event (NleComposition * comp, gboolean initial,
gboolean updatestoponly);
-static gboolean
-_nle_composition_add_object (NleComposition * comp, NleObject * object);
-static gboolean
-_nle_composition_remove_object (NleComposition * comp, NleObject * object);
+static gboolean _nle_composition_add_object (NleComposition * comp,
+ NleObject * object);
+static gboolean _nle_composition_remove_object (NleComposition * comp,
+ NleObject * object);
static void _deactivate_stack (NleComposition * comp,
gboolean flush_downstream);
static gboolean _set_real_eos_seqnum_from_seek (NleComposition * comp,
GstEvent * event);
-static gboolean _emit_commited_signal_func (NleComposition * comp);
+static void _emit_commited_signal_func (NleComposition * comp, gpointer udata);
static void _restart_task (UpdateCompositionData * ucompo);
+static void
+_add_action (NleComposition * comp, GCallback func, gpointer data,
+ gint priority);
/* COMP_REAL_START: actual position to start current playback at. */
(MIN (comp->priv->segment->stop, NLE_OBJECT_STOP (comp))) : \
NLE_OBJECT_STOP (comp))
-#define MAIN_CONTEXT_LOCK(comp) G_STMT_START { \
- GST_LOG_OBJECT (comp, "Getting MAIN_CONTEXT_LOCK in thread %p", \
+#define ACTIONS_LOCK(comp) G_STMT_START { \
+ GST_LOG_OBJECT (comp, "Getting ACTIONS_LOCK in thread %p", \
g_thread_self()); \
- g_mutex_lock(&((NleComposition*)comp)->priv->mcontext_lock); \
- GST_LOG_OBJECT (comp, "Got MAIN_CONTEXT_LOCK in thread %p", \
+ g_mutex_lock(&((NleComposition*)comp)->priv->actions_lock); \
+ GST_LOG_OBJECT (comp, "Got ACTIONS_LOCK in thread %p", \
g_thread_self()); \
} G_STMT_END
-#define MAIN_CONTEXT_UNLOCK(comp) G_STMT_START { \
- g_mutex_unlock(&((NleComposition*)comp)->priv->mcontext_lock); \
- GST_LOG_OBJECT (comp, "Unlocked MAIN_CONTEXT_LOCK in thread %p", \
+#define ACTIONS_UNLOCK(comp) G_STMT_START { \
+ g_mutex_unlock(&((NleComposition*)comp)->priv->actions_lock); \
+ GST_LOG_OBJECT (comp, "Unlocked ACTIONS_LOCK in thread %p", \
g_thread_self()); \
} G_STMT_END
+#define WAIT_FOR_AN_ACTION(comp) G_STMT_START { \
+ GST_LOG_OBJECT (comp, "Waiting for an action in thread %p", \
+ g_thread_self()); \
+ g_cond_wait(&((NleComposition*)comp)->priv->actions_cond, \
+ &((NleComposition*)comp)->priv->actions_lock); \
+ GST_LOG_OBJECT (comp, "Done WAITING for an action in thread %p", \
+ g_thread_self()); \
+} G_STMT_END
+
+#define SIGNAL_NEW_ACTION(comp) G_STMT_START { \
+ GST_LOG_OBJECT (comp, "Signalling new action from thread %p", \
+ g_thread_self()); \
+ g_cond_signal(&((NleComposition*)comp)->priv->actions_cond); \
+} G_STMT_END
+
#define GET_TASK_LOCK(comp) (&(NLE_COMPOSITION(comp)->task_rec_lock))
static inline gboolean
}
static void
-_destroy_gsource (GSource * source)
+_remove_actions_for_type (NleComposition * comp, GCallback callback)
{
- g_source_destroy (source);
- g_source_unref (source);
-}
+ GList *tmp;
-static void
-_remove_all_update_sources (NleComposition * comp)
-{
- MAIN_CONTEXT_LOCK (comp);
- g_list_free_full (comp->priv->update_gsources,
- (GDestroyNotify) _destroy_gsource);
- comp->priv->update_gsources = NULL;
- MAIN_CONTEXT_UNLOCK (comp);
-}
+ ACTIONS_LOCK (comp);
+ for (tmp = comp->priv->actions; tmp; tmp = tmp->next) {
+ Action *act = tmp->data;
+
+ if (ACTION_CALLBACK (act) == callback) {
+ g_closure_unref ((GClosure *) act);
+ comp->priv->actions = g_list_delete_link (comp->priv->actions, tmp);
+ }
+ }
+ ACTIONS_UNLOCK (comp);
-static void
-_remove_all_seek_sources (NleComposition * comp)
-{
- MAIN_CONTEXT_LOCK (comp);
- g_list_free_full (comp->priv->seek_gsources,
- (GDestroyNotify) _destroy_gsource);
- comp->priv->seek_gsources = NULL;
- MAIN_CONTEXT_UNLOCK (comp);
}
static void
-iterate_main_context_func (NleComposition * comp)
+_execute_actions (NleComposition * comp)
{
- if (comp->priv->running == FALSE) {
+ NleCompositionPrivate *priv = comp->priv;
+
+ ACTIONS_LOCK (comp);
+ if (priv->running == FALSE) {
GST_DEBUG_OBJECT (comp, "Not running anymore");
+ ACTIONS_UNLOCK (comp);
return;
}
- g_main_context_iteration (comp->priv->mcontext, TRUE);
+ if (priv->actions == NULL)
+ WAIT_FOR_AN_ACTION (comp);
+
+ if (comp->priv->running == FALSE) {
+ GST_INFO_OBJECT (comp, "Done waiting but not running anymore");
+
+ ACTIONS_UNLOCK (comp);
+ return;
+ }
+
+ if (priv->actions) {
+ GValue params[1] = { G_VALUE_INIT };
+ GList *lact;
+
+ g_value_init (¶ms[0], G_TYPE_OBJECT);
+ g_value_set_object (¶ms[0], comp);
+
+ lact = priv->actions;
+ priv->actions = priv->actions->next;
+ ACTIONS_UNLOCK (comp);
+
+ GST_INFO_OBJECT (comp, "Invoking %p:%s",
+ lact->data, GST_DEBUG_FUNCPTR_NAME ((ACTION_CALLBACK (lact->data))));
+ g_closure_invoke (lact->data, NULL, 1, params, NULL);
+ } else {
+ ACTIONS_UNLOCK (comp);
+ }
}
static void
{
GstTask *task;
+ ACTIONS_LOCK (comp);
comp->priv->running = TRUE;
+ ACTIONS_UNLOCK (comp);
GST_OBJECT_LOCK (comp);
task = comp->task;
if (task == NULL) {
- task =
- gst_task_new ((GstTaskFunction) iterate_main_context_func, comp, NULL);
+ task = gst_task_new ((GstTaskFunction) _execute_actions, comp, NULL);
gst_task_set_lock (task, GET_TASK_LOCK (comp));
GST_INFO_OBJECT (comp, "created task %p", task);
comp->task = task;
GST_INFO_OBJECT (comp, "Stoping children management task");
+ ACTIONS_LOCK (comp);
comp->priv->running = FALSE;
- /* Clean the stack of GSource set on the MainContext */
- g_main_context_wakeup (comp->priv->mcontext);
+ /* Make sure we do not stay blocked trying to execute an action */
+ SIGNAL_NEW_ACTION (comp);
+ ACTIONS_UNLOCK (comp);
GST_DEBUG_OBJECT (comp, "stop task");
}
static void
-_free_seek_data (SeekData * seekd)
-{
- gst_event_unref (seekd->event);
- g_slice_free (SeekData, seekd);
-}
-
-static gboolean
-_seek_pipeline_func (SeekData * seekd)
+_seek_pipeline_func (NleComposition * comp, SeekData * seekd)
{
gdouble rate;
GstFormat format;
GstSeekFlags flags;
GstSeekType cur_type, stop_type;
gint64 cur, stop;
- NleCompositionPrivate *priv = seekd->comp->priv;
+ NleCompositionPrivate *priv = comp->priv;
gst_event_parse_seek (seekd->event, &rate, &format, &flags,
&cur_type, &cur, &stop_type, &stop);
" Not seeking", GST_TIME_ARGS (priv->segment->start),
GST_TIME_ARGS (NLE_OBJECT_STOP (seekd->comp)));
GST_FIXME_OBJECT (seekd->comp, "HANDLE error async!");
- goto beach;
+ return;
}
_post_start_composition_update (seekd->comp,
_post_start_composition_update_done (seekd->comp,
gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
-
-beach:
- return G_SOURCE_REMOVE;
-}
-
-static void
-_add_gsource (NleComposition * comp, GSourceFunc func,
- gpointer data, GDestroyNotify destroy, gint priority)
-{
- GSource *source;
- NleCompositionPrivate *priv = comp->priv;
-
- GST_INFO_OBJECT (comp, "Adding GSource for function: %s",
- GST_DEBUG_FUNCPTR_NAME (func));
-
- MAIN_CONTEXT_LOCK (comp);
- source = g_idle_source_new ();
- g_source_set_callback (source, func, data, destroy);
- g_source_set_priority (source, priority);
-
- if (func == (GSourceFunc) _update_pipeline_func)
- priv->update_gsources = g_list_prepend (priv->update_gsources, source);
- else if (func == (GSourceFunc) _seek_pipeline_func)
- priv->seek_gsources = g_list_prepend (priv->seek_gsources, source);
- else
- priv->gsources = g_list_prepend (priv->gsources, source);
-
- g_source_attach (source, priv->mcontext);
- MAIN_CONTEXT_UNLOCK (comp);
-}
-
-static void
-_add_seek_gsource (NleComposition * comp, GstEvent * event)
-{
- SeekData *seekd = g_slice_new0 (SeekData);
-
- GST_DEBUG_OBJECT (comp, "Adding GSource");
-
- seekd->comp = comp;
- seekd->event = event;
-
- comp->priv->next_eos_seqnum = 0;
- comp->priv->real_eos_seqnum = 0;
- _add_gsource (comp, (GSourceFunc) _seek_pipeline_func, seekd,
- (GDestroyNotify) _free_seek_data, G_PRIORITY_DEFAULT);
}
/* Must be called with OBJECTS_LOCK taken */
NleCompositionPrivate *priv = comp->priv;
priv->next_base_time = 0;
- GST_ERROR_OBJECT (comp, "Commiting state");
_process_pending_entries (comp);
}
static gboolean
-_initialize_stack_func (UpdateCompositionData * ucompo)
+_initialize_stack_func (NleComposition * comp, UpdateCompositionData * ucompo)
{
- NleComposition *comp = ucompo->comp;
NleCompositionPrivate *priv = comp->priv;
_commit_all_values (comp);
}
static void
-_free_child_io_data (gpointer childio)
+_remove_object_func (NleComposition * comp, ChildIOData * childio)
{
- g_slice_free (ChildIOData, childio);
-}
-
-static gboolean
-_remove_object_func (ChildIOData * childio)
-{
- NleComposition *comp = childio->comp;
NleObject *object = childio->object;
NleCompositionPrivate *priv = comp->priv;
" for addition, removing it from the addition list", object);
g_hash_table_remove (priv->pending_io, object);
- return G_SOURCE_REMOVE;
+ return;
}
GST_ERROR_OBJECT (comp, "Object %" GST_PTR_FORMAT " is "
" not in the composition", object);
- return G_SOURCE_REMOVE;
+ return;
}
if (in_pending_io) {
GST_WARNING_OBJECT (comp, "Object %" GST_PTR_FORMAT " is already marked"
" for removal", object);
- return G_SOURCE_REMOVE;
+ return;
}
g_hash_table_add (priv->pending_io, object);
- return G_SOURCE_REMOVE;
+ return;
}
static void
-_add_remove_object_gsource (NleComposition * comp, NleObject * object)
+_add_remove_object_action (NleComposition * comp, NleObject * object)
{
ChildIOData *childio = g_slice_new0 (ChildIOData);
- GST_DEBUG_OBJECT (comp, "Adding GSource");
+ GST_DEBUG_OBJECT (comp, "Adding Action");
childio->comp = comp;
childio->object = object;
- _add_gsource (comp, (GSourceFunc) _remove_object_func,
- childio, _free_child_io_data, G_PRIORITY_DEFAULT);
+ _add_action (comp, G_CALLBACK (_remove_object_func),
+ childio, G_PRIORITY_DEFAULT);
}
-static gboolean
-_add_object_func (ChildIOData * childio)
+static void
+_add_object_func (NleComposition * comp, ChildIOData * childio)
{
- NleComposition *comp = childio->comp;
NleObject *object = childio->object;
NleCompositionPrivate *priv = comp->priv;
NleObject *in_pending_io;
GST_ERROR_OBJECT (comp, "Object %" GST_PTR_FORMAT " is "
" already in the composition", object);
- return G_SOURCE_REMOVE;
+ return;
}
if (in_pending_io) {
GST_WARNING_OBJECT (comp, "Object %" GST_PTR_FORMAT " is already marked"
" for addition", object);
- return G_SOURCE_REMOVE;
+ return;
}
g_hash_table_add (priv->pending_io, object);
-
- return G_SOURCE_REMOVE;
}
static void
-_add_add_object_gsource (NleComposition * comp, NleObject * object)
+_add_add_object_action (NleComposition * comp, NleObject * object)
{
ChildIOData *childio = g_slice_new0 (ChildIOData);
- GST_DEBUG_OBJECT (comp, "Adding GSource");
+ GST_DEBUG_OBJECT (comp, "Adding Action");
childio->comp = comp;
childio->object = object;
- _add_gsource (comp, (GSourceFunc) _add_object_func, childio,
- _free_child_io_data, G_PRIORITY_DEFAULT);
+ _add_action (comp, G_CALLBACK (_add_object_func), childio,
+ G_PRIORITY_DEFAULT);
}
static void
}
static void
-_add_update_compo_gsource (NleComposition * comp,
- GSourceFunc func, NleUpdateStackReason reason)
+_free_action (Action * action, gpointer udata)
+{
+ if (ACTION_CALLBACK (action) == _seek_pipeline_func) {
+ SeekData *seekd = (SeekData *) udata;
+
+ gst_event_unref (seekd->event);
+ g_slice_free (SeekData, seekd);
+ } else if (ACTION_CALLBACK (action) == _remove_object_func ||
+ ACTION_CALLBACK (action) == _add_object_func) {
+ g_slice_free (ChildIOData, udata);
+ } else if (ACTION_CALLBACK (action) == _update_pipeline_func ||
+ ACTION_CALLBACK (action) == _commit_func ||
+ ACTION_CALLBACK (action) == _initialize_stack_func) {
+ g_slice_free (UpdateCompositionData, udata);
+ }
+}
+
+static void
+_add_action (NleComposition * comp, GCallback func,
+ gpointer data, gint priority)
+{
+ Action *action;
+ NleCompositionPrivate *priv = comp->priv;
+
+
+ action = (Action *) g_closure_new_simple (sizeof (Action), data);
+ g_closure_add_finalize_notifier ((GClosure *) action, data,
+ (GClosureNotify) _free_action);
+ ACTION_CALLBACK (action) = func;
+ action->priority = priority;
+ g_closure_set_marshal ((GClosure *) action, g_cclosure_marshal_VOID__VOID);
+
+ ACTIONS_LOCK (comp);
+ GST_INFO_OBJECT (comp, "Adding Action for function: %p:%s",
+ action, GST_DEBUG_FUNCPTR_NAME (func));
+
+ if (func == G_CALLBACK (_emit_commited_signal_func))
+ priv->actions = g_list_prepend (priv->actions, action);
+ else
+ priv->actions = g_list_append (priv->actions, action);
+
+ SIGNAL_NEW_ACTION (comp);
+ ACTIONS_UNLOCK (comp);
+}
+
+static void
+_add_seek_action (NleComposition * comp, GstEvent * event)
+{
+ SeekData *seekd = g_slice_new0 (SeekData);
+
+ GST_DEBUG_OBJECT (comp, "Adding Action");
+
+ seekd->comp = comp;
+ seekd->event = event;
+
+ comp->priv->next_eos_seqnum = 0;
+ comp->priv->real_eos_seqnum = 0;
+ _add_action (comp, G_CALLBACK (_seek_pipeline_func), seekd,
+ G_PRIORITY_DEFAULT);
+}
+
+static void
+_remove_update_actions (NleComposition * comp)
+{
+ _remove_actions_for_type (comp, G_CALLBACK (_update_pipeline_func));
+}
+
+static void
+_remove_seek_actions (NleComposition * comp)
+{
+ _remove_actions_for_type (comp, G_CALLBACK (_seek_pipeline_func));
+}
+
+static void
+_add_update_compo_action (NleComposition * comp,
+ GCallback callback, NleUpdateStackReason reason)
{
UpdateCompositionData *ucompo = g_slice_new0 (UpdateCompositionData);
GST_INFO_OBJECT (comp, "Updating because: %s -- Setting seqnum: %i",
UPDATE_PIPELINE_REASONS[reason], ucompo->seqnum);
- _add_gsource (comp, (GSourceFunc) func, ucompo,
- _free_update_compo_data, G_PRIORITY_DEFAULT);
+ _add_action (comp, callback, ucompo, G_PRIORITY_DEFAULT);
}
static void
priv->objects_hash = g_hash_table_new (g_direct_hash, g_direct_equal);
- priv->mcontext = g_main_context_new ();
- g_main_context_set_dispatches_per_iteration (priv->mcontext, 1);
- g_mutex_init (&priv->mcontext_lock);
+ g_mutex_init (&priv->actions_lock);
+ g_cond_init (&priv->actions_cond);
priv->pending_io = g_hash_table_new (g_direct_hash, g_direct_equal);
G_OBJECT_CLASS (parent_class)->finalize (object);
- g_mutex_clear (&priv->mcontext_lock);
+ g_mutex_clear (&priv->actions_lock);
+ g_cond_clear (&priv->actions_cond);
}
/* signal_duration_change
}
if (priv->next_eos_seqnum == seqnum)
- _add_update_compo_gsource (comp, (GSourceFunc) _update_pipeline_func,
+ _add_update_compo_action (comp, G_CALLBACK (_update_pipeline_func),
COMP_UPDATE_STACK_ON_EOS);
else
GST_INFO_OBJECT (comp,
static gboolean
nle_composition_commit_func (NleObject * object, gboolean recurse)
{
- _add_update_compo_gsource (NLE_COMPOSITION (object),
- (GSourceFunc) _commit_func, COMP_UPDATE_STACK_ON_COMMIT);
+ _add_update_compo_action (NLE_COMPOSITION (object),
+ G_CALLBACK (_commit_func), COMP_UPDATE_STACK_ON_COMMIT);
return TRUE;
}
gst_event_set_seqnum (toplevel_seek, seqnum);
_set_real_eos_seqnum_from_seek (comp, toplevel_seek);
- _remove_all_update_sources (comp);
+ _remove_update_actions (comp);
_seek_current_stack (comp, toplevel_seek,
_have_to_flush_downstream (update_stack_reason));
update_operations_base_time (comp, !(comp->priv->segment->rate >= 0.0));
case GST_EVENT_SEEK:
{
if (!priv->seeking_itself) {
- _add_seek_gsource (comp, event);
+ _add_seek_action (comp, event);
event = NULL;
GST_FIXME_OBJECT (comp, "HANDLE seeking errors!");
comp->priv->tearing_down_stack = FALSE;
}
-static gboolean
-_emit_commited_signal_func (NleComposition * comp)
+static void
+_emit_commited_signal_func (NleComposition * comp, gpointer udata)
{
GST_INFO_OBJECT (comp, "Emiting COMMITED now that the stack " "is ready");
g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
-
- return G_SOURCE_REMOVE;
}
static void
UPDATE_PIPELINE_REASONS[ucompo->reason]);
if (ucompo->reason == COMP_UPDATE_STACK_ON_COMMIT)
- _add_gsource (comp, (GSourceFunc) _emit_commited_signal_func, comp, NULL,
+ _add_action (comp, G_CALLBACK (_emit_commited_signal_func), comp,
G_PRIORITY_HIGH);
comp->priv->awaited_caps_seqnum = 0;
return GST_PAD_PROBE_OK;
}
-static gboolean
-_commit_func (UpdateCompositionData * ucompo)
+static void
+_commit_func (NleComposition * comp, UpdateCompositionData * ucompo)
{
GstClockTime curpos;
- NleComposition *comp = ucompo->comp;
NleCompositionPrivate *priv = comp->priv;
_post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, FALSE);
_post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
- return G_SOURCE_REMOVE;
+ return;
}
if (priv->initialized == FALSE) {
}
_post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
-
- return G_SOURCE_REMOVE;
}
-static gboolean
-_update_pipeline_func (UpdateCompositionData * ucompo)
+static void
+_update_pipeline_func (NleComposition * comp, UpdateCompositionData * ucompo)
{
gboolean reverse;
- NleComposition *comp = ucompo->comp;
NleCompositionPrivate *priv = comp->priv;
_post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
}
_post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
-
- return G_SOURCE_REMOVE;
}
static void
GST_DEBUG_OBJECT (comp,
"Setting all children to READY and locking their state");
- _add_update_compo_gsource (comp, (GSourceFunc) _initialize_stack_func,
+ _add_update_compo_action (comp, G_CALLBACK (_initialize_stack_func),
COMP_UPDATE_STACK_INITIALIZE);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
_stop_task (comp);
- _remove_all_update_sources (comp);
- _remove_all_seek_sources (comp);
+ _remove_update_actions (comp);
+ _remove_seek_actions (comp);
_set_all_children_state (comp, GST_STATE_READY);
nle_composition_reset (comp);
case GST_STATE_CHANGE_READY_TO_NULL:
_stop_task (comp);
- _remove_all_update_sources (comp);
- _remove_all_seek_sources (comp);
+ _remove_update_actions (comp);
+ _remove_seek_actions (comp);
_set_all_children_state (comp, GST_STATE_NULL);
break;
default:
resync_state:
gst_element_set_locked_state (priv->current_bin, FALSE);
- GST_ERROR ("going back to parent state");
+ GST_DEBUG ("going back to parent state");
gst_element_sync_state_with_parent (priv->current_bin);
- GST_ERROR ("gone back to parent state");
+ GST_DEBUG ("gone back to parent state");
return TRUE;
}
gst_event_set_seqnum (toplevel_seek, seqnum);
_set_real_eos_seqnum_from_seek (comp, toplevel_seek);
- _remove_all_update_sources (comp);
+ _remove_update_actions (comp);
/* If stacks are different, unlink/relink objects */
if (!samestack) {
object = NLE_OBJECT (element);
object->in_composition = TRUE;
- _add_add_object_gsource (comp, object);
+ _add_add_object_action (comp, object);
return TRUE;
}
object = NLE_OBJECT (element);
object->in_composition = FALSE;
- _add_remove_object_gsource (comp, object);
+ _add_remove_object_action (comp, object);
return TRUE;
}