From f8114cc17880d29cc83ae944c541db552b148751 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 2 Feb 2005 15:31:06 +0000 Subject: [PATCH] gst/schedulers/gstoptimalscheduler.c: Added lock to protect scheduler data structures. Original commit message from CVS: * gst/schedulers/gstoptimalscheduler.c: (gst_opt_scheduler_class_init), (gst_opt_scheduler_init), (gst_opt_scheduler_finalize), (remove_decoupled), (schedule_chain), (get_invalid_call), (chain_invalid_call), (get_group_schedule_function), (loop_group_schedule_function), (gst_opt_scheduler_loop_wrapper), (gst_opt_scheduler_get_wrapper), (gst_opt_scheduler_state_transition), (gst_opt_scheduler_add_element), (gst_opt_scheduler_remove_element), (gst_opt_scheduler_interrupt), (gst_opt_scheduler_error), (gst_opt_scheduler_pad_link), (gst_opt_scheduler_pad_unlink), (gst_opt_scheduler_iterate), (gst_opt_scheduler_show): Added lock to protect scheduler data structures. --- ChangeLog | 16 +++ gst/schedulers/gstoptimalscheduler.c | 188 +++++++++++++++++++++++++++++++---- 2 files changed, 183 insertions(+), 21 deletions(-) diff --git a/ChangeLog b/ChangeLog index 47f0ded..1e23554 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,19 @@ +2005-02-02 Wim Taymans + + * gst/schedulers/gstoptimalscheduler.c: + (gst_opt_scheduler_class_init), (gst_opt_scheduler_init), + (gst_opt_scheduler_finalize), (remove_decoupled), (schedule_chain), + (get_invalid_call), (chain_invalid_call), + (get_group_schedule_function), (loop_group_schedule_function), + (gst_opt_scheduler_loop_wrapper), (gst_opt_scheduler_get_wrapper), + (gst_opt_scheduler_state_transition), + (gst_opt_scheduler_add_element), + (gst_opt_scheduler_remove_element), (gst_opt_scheduler_interrupt), + (gst_opt_scheduler_error), (gst_opt_scheduler_pad_link), + (gst_opt_scheduler_pad_unlink), (gst_opt_scheduler_iterate), + (gst_opt_scheduler_show): + Added lock to protect scheduler data structures. + 2005-02-01 Ronald S. Bultje * testsuite/threads/threadi.c: (cb_data): diff --git a/gst/schedulers/gstoptimalscheduler.c b/gst/schedulers/gstoptimalscheduler.c index d539835..b21bd15 100644 --- a/gst/schedulers/gstoptimalscheduler.c +++ b/gst/schedulers/gstoptimalscheduler.c @@ -73,10 +73,15 @@ typedef enum } GstOptSchedulerState; +#define GST_OPT_LOCK(sched) (g_static_rec_mutex_lock (&((GstOptScheduler *)sched)->lock)) +#define GST_OPT_UNLOCK(sched) (g_static_rec_mutex_unlock (&((GstOptScheduler *)sched)->lock)) + struct _GstOptScheduler { GstScheduler parent; + GStaticRecMutex lock; + GstOptSchedulerState state; #ifdef USE_COTHREADS @@ -254,7 +259,7 @@ static void chain_recursively_migrate_group (GstOptSchedulerChain * chain, GstOptSchedulerGroup * group); static void chain_group_set_enabled (GstOptSchedulerChain * chain, GstOptSchedulerGroup * group, gboolean enabled); -static void schedule_chain (GstOptSchedulerChain * chain); +static gboolean schedule_chain (GstOptSchedulerChain * chain); /* @@ -323,6 +328,7 @@ static void gst_opt_scheduler_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static void gst_opt_scheduler_dispose (GObject * object); +static void gst_opt_scheduler_finalize (GObject * object); static void gst_opt_scheduler_setup (GstScheduler * sched); static void gst_opt_scheduler_reset (GstScheduler * sched); @@ -391,6 +397,7 @@ gst_opt_scheduler_class_init (GstOptSchedulerClass * klass) gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property); gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose); + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_opt_scheduler_finalize); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ITERATIONS, g_param_spec_int ("iterations", "Iterations", @@ -431,6 +438,8 @@ gst_opt_scheduler_class_init (GstOptSchedulerClass * klass) static void gst_opt_scheduler_init (GstOptScheduler * scheduler) { + g_static_rec_mutex_init (&scheduler->lock); + scheduler->elements = NULL; scheduler->iterations = 1; scheduler->max_recursion = 100; @@ -445,6 +454,16 @@ gst_opt_scheduler_dispose (GObject * object) G_OBJECT_CLASS (parent_class)->dispose (object); } +static void +gst_opt_scheduler_finalize (GObject * object) +{ + GstOptScheduler *osched = GST_OPT_SCHEDULER (object); + + g_static_rec_mutex_free (&osched->lock); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + static gboolean plugin_init (GstPlugin * plugin) { @@ -897,6 +916,46 @@ add_to_group (GstOptSchedulerGroup * group, GstElement * element, return group; } +/* we need to remove a decoupled element from the scheduler, this + * involves finding all the groups that have this element as an + * entry point and disabling them. */ +static void +remove_decoupled (GstScheduler * sched, GstElement * element) +{ + GSList *chains; + GList *schedulers; + GstOptScheduler *osched = GST_OPT_SCHEDULER (sched); + + GST_DEBUG_OBJECT (sched, "removing decoupled element \"%s\"", + GST_OBJECT_NAME (element)); + + for (chains = osched->chains; chains; chains = g_slist_next (chains)) { + GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data; + GSList *groups; + + for (groups = chain->groups; groups; groups = g_slist_next (groups)) { + GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data; + + if (group->entry) { + GST_DEBUG_OBJECT (sched, "group %p, entry %s", group, + GST_STR_NULL (GST_OBJECT_NAME (group->entry))); + } + + if (group->entry == element) { + GST_DEBUG ("clearing element %p \"%s\" as entry from group %p", + element, GST_ELEMENT_NAME (element), group); + group->entry = NULL; + group->type = GST_OPT_SCHEDULER_GROUP_UNKNOWN; + } + } + } + /* and remove from any child scheduler */ + for (schedulers = sched->schedulers; schedulers; + schedulers = g_list_next (schedulers)) { + remove_decoupled (GST_SCHEDULER (schedulers->data), element); + } +} + static GstOptSchedulerGroup * remove_from_group (GstOptSchedulerGroup * group, GstElement * element) { @@ -1235,11 +1294,12 @@ gst_opt_scheduler_schedule_run_queue (GstOptScheduler * osched, #endif /* a chain is scheduled by picking the first active group and scheduling it */ -static void +static gboolean schedule_chain (GstOptSchedulerChain * chain) { GSList *groups; GstOptScheduler *osched; + gboolean scheduled = FALSE; osched = chain->sched; @@ -1249,6 +1309,7 @@ schedule_chain (GstOptSchedulerChain * chain) sort_chain (chain); GST_OPT_SCHEDULER_CHAIN_SET_CLEAN (chain); + /* FIXME handle the case where the groups change during scheduling */ groups = chain->groups; while (groups) { GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data; @@ -1268,6 +1329,8 @@ schedule_chain (GstOptSchedulerChain * chain) gst_opt_scheduler_schedule_run_queue (osched, NULL); #endif + scheduled = TRUE; + GST_LOG ("done scheduling group %p in chain %p", group, chain); unref_group (group); break; @@ -1275,6 +1338,31 @@ schedule_chain (GstOptSchedulerChain * chain) groups = g_slist_next (groups); } + return scheduled; +} + +/* this function is inserted in the gethandler when you are not + * supposed to call _pull on the pad. */ +static GstData * +get_invalid_call (GstPad * pad) +{ + GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL), + ("get on pad %s:%s but the peer is operating chain based and so is not " + "allowed to pull, fix the element.", GST_DEBUG_PAD_NAME (pad))); + + return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)); +} + +/* this function is inserted in the chainhandler when you are not + * supposed to call _push on the pad. */ +static void +chain_invalid_call (GstPad * pad, GstData * data) +{ + GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL), + ("chain on pad %s:%s but the pad is get based", + GST_DEBUG_PAD_NAME (pad))); + + gst_data_unref (data); } /* a get-based group is scheduled by getting a buffer from the get based @@ -1287,16 +1375,21 @@ get_group_schedule_function (int argc, char *argv[]) GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv; GstElement *entry = group->entry; const GList *pads; + GstOptScheduler *osched; /* what if the entry point disappeared? */ - if (!entry) + if (entry == NULL || group->chain == NULL) return 0; + + osched = group->chain->sched; + pads = gst_element_get_pad_list (entry); GST_LOG ("executing get-based group %p", group); group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING; + GST_OPT_UNLOCK (osched); while (pads) { GstData *data; GstPad *pad = GST_PAD (pads->data); @@ -1320,6 +1413,7 @@ get_group_schedule_function (int argc, char *argv[]) gst_pad_push (pad, data); } } + GST_OPT_LOCK (osched); group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING; @@ -1329,7 +1423,8 @@ get_group_schedule_function (int argc, char *argv[]) /* a loop-based group is scheduled by calling the loop function * on the entry point. * We also set the running flag on this group for as long as this - * function is running. */ + * function is running. + * This function should be called with the scheduler lock held. */ static int loop_group_schedule_function (int argc, char *argv[]) { @@ -1343,9 +1438,16 @@ loop_group_schedule_function (int argc, char *argv[]) GST_DEBUG ("calling loopfunc of element %s in group %p", GST_ELEMENT_NAME (entry), group); - if (entry->loopfunc) + if (group->chain == NULL) + return 0; + + if (entry->loopfunc) { + GstOptScheduler *osched = group->chain->sched; + + GST_OPT_UNLOCK (osched); entry->loopfunc (entry); - else + GST_OPT_LOCK (osched); + } else group_error_handler (group); GST_LOG ("returned from loopfunc of element %s in group %p", @@ -1386,6 +1488,7 @@ gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data) GST_LOG ("chain handler for loop-based pad %" GST_PTR_FORMAT, sinkpad); + GST_OPT_LOCK (osched); #ifdef USE_COTHREADS if (GST_PAD_DATALIST (peer)) { g_warning ("deadlock detected, disabling group %p", group); @@ -1408,6 +1511,7 @@ gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data) } } #endif + GST_OPT_UNLOCK (osched); GST_LOG ("%d datas left on %s:%s's datapen after chain handler", g_list_length (GST_PAD_DATALIST (peer)), GST_DEBUG_PAD_NAME (peer)); @@ -1449,6 +1553,7 @@ gst_opt_scheduler_get_wrapper (GstPad * srcpad) data = NULL; disabled = FALSE; + GST_OPT_LOCK (osched); do { GST_LOG ("scheduling upstream group %p to fill datapen", group); #ifdef USE_COTHREADS @@ -1479,7 +1584,8 @@ gst_opt_scheduler_get_wrapper (GstPad * srcpad) * this is not allowed in the optimal scheduler (yet) */ g_warning ("deadlock detected, disabling group %p", group); group_error_handler (group); - return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)); + data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)); + goto done; } #endif /* if the scheduler interrupted, make sure we send an INTERRUPTED event @@ -1500,6 +1606,11 @@ gst_opt_scheduler_get_wrapper (GstPad * srcpad) } while (data == NULL); +#ifndef USE_COTHREADS +done: +#endif + GST_OPT_UNLOCK (osched); + GST_LOG ("get handler, returning data %p, queue length %d", data, g_list_length (GST_PAD_DATALIST (srcpad))); @@ -1560,6 +1671,7 @@ gst_opt_scheduler_state_transition (GstScheduler * sched, GstElement * element, GST_ELEMENT_NAME (element) ? GST_ELEMENT_NAME (element) : "(null)", transition); + GST_OPT_LOCK (sched); /* we check the state of the managing pipeline here */ if (GST_IS_BIN (element)) { if (GST_SCHEDULER_PARENT (sched) == element) { @@ -1579,12 +1691,14 @@ gst_opt_scheduler_state_transition (GstScheduler * sched, GstElement * element, GST_LOG ("no interesting state change, doing nothing"); } } - return res; + goto done; } /* we don't care about decoupled elements after this */ - if (GST_ELEMENT_IS_DECOUPLED (element)) - return GST_STATE_SUCCESS; + if (GST_ELEMENT_IS_DECOUPLED (element)) { + res = GST_STATE_SUCCESS; + goto done; + } /* get the group of the element */ group = GST_ELEMENT_SCHED_GROUP (element); @@ -1617,6 +1731,11 @@ gst_opt_scheduler_state_transition (GstScheduler * sched, GstElement * element, break; } + //gst_scheduler_show (sched); + +done: + GST_OPT_UNLOCK (sched); + return res; } @@ -1889,10 +2008,12 @@ gst_opt_scheduler_add_element (GstScheduler * sched, GstElement * element) GstOptSchedulerGroup *group; GstOptSchedulerChain *chain; + GST_OPT_LOCK (sched); chain = create_chain (osched); group = create_group (chain, element, GST_OPT_SCHEDULER_GROUP_LOOP); group->entry = element; + GST_OPT_UNLOCK (sched); GST_LOG ("added element \"%s\" as loop based entry", GST_ELEMENT_NAME (element)); @@ -1907,10 +2028,13 @@ gst_opt_scheduler_remove_element (GstScheduler * sched, GstElement * element) GST_DEBUG_OBJECT (sched, "removing element \"%s\"", GST_OBJECT_NAME (element)); + GST_OPT_LOCK (sched); /* decoupled elements are not added to the scheduler lists and should therefore * not be removed */ - if (GST_ELEMENT_IS_DECOUPLED (element)) - return; + if (GST_ELEMENT_IS_DECOUPLED (element)) { + remove_decoupled (sched, element); + goto done; + } /* the element is guaranteed to live in it's own group/chain now */ get_group (element, &group); @@ -1920,6 +2044,9 @@ gst_opt_scheduler_remove_element (GstScheduler * sched, GstElement * element) g_free (GST_ELEMENT (element)->sched_private); GST_ELEMENT (element)->sched_private = NULL; + +done: + GST_OPT_UNLOCK (sched); } static gboolean @@ -1956,8 +2083,10 @@ gst_opt_scheduler_interrupt (GstScheduler * sched, GstElement * element) { GstOptScheduler *osched = GST_OPT_SCHEDULER (sched); + GST_OPT_LOCK (sched); GST_INFO ("scheduler set interrupted state"); osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED; + GST_OPT_UNLOCK (sched); } return TRUE; #endif @@ -1969,11 +2098,13 @@ gst_opt_scheduler_error (GstScheduler * sched, GstElement * element) GstOptScheduler *osched = GST_OPT_SCHEDULER (sched); GstOptSchedulerGroup *group; + GST_OPT_LOCK (sched); get_group (element, &group); if (group) group_error_handler (group); osched->state = GST_OPT_SCHEDULER_STATE_ERROR; + GST_OPT_UNLOCK (sched); } /* link pads, merge groups and chains */ @@ -1991,6 +2122,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, src_element = GST_PAD_PARENT (srcpad); sink_element = GST_PAD_PARENT (sinkpad); + GST_OPT_LOCK (sched); /* first we need to figure out what type of link we're dealing * with */ if (src_element->loopfunc && sink_element->loopfunc) @@ -2014,7 +2146,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, if (group->entry != sink_element) { g_error ("internal error: cannot schedule get to loop in multi-loop based group"); - return; + goto done; } } } else @@ -2033,7 +2165,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, if (group->entry != src_element) { g_error ("internal error: cannot schedule get to chain " "with mixed loop/chain based group"); - return; + goto done; } } } else @@ -2055,7 +2187,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, GST_LOG ("get to chain based link"); /* setup get/chain handlers */ - GST_RPAD_GETHANDLER (srcpad) = gst_pad_call_get_function; + GST_RPAD_GETHANDLER (srcpad) = get_invalid_call; GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function; /* the two elements should be put into the same group, @@ -2081,6 +2213,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, case GST_OPT_CHAIN_TO_CHAIN: GST_LOG ("loop/chain to chain based link"); + GST_RPAD_GETHANDLER (srcpad) = get_invalid_call; GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function; /* the two elements should be put into the same group, this also means @@ -2095,6 +2228,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, GST_LOG ("get to loop based link"); GST_RPAD_GETHANDLER (srcpad) = gst_pad_call_get_function; + GST_RPAD_CHAINHANDLER (sinkpad) = chain_invalid_call; /* the two elements should be put into the same group, this also means * that they are in the same chain automatically, sink_element is @@ -2110,8 +2244,8 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, GST_LOG ("chain/loop to loop based link"); - GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper; GST_RPAD_GETHANDLER (srcpad) = gst_opt_scheduler_get_wrapper; + GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper; /* events on the srcpad have to be intercepted as we might need to * flush the buffer lists, so override the given eventfunc */ GST_RPAD_EVENTHANDLER (srcpad) = gst_opt_scheduler_event_wrapper; @@ -2145,6 +2279,8 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, g_error ("(internal error) invalid element link, what are you doing?"); break; } +done: + GST_OPT_UNLOCK (sched); } static void @@ -2513,6 +2649,7 @@ gst_opt_scheduler_pad_unlink (GstScheduler * sched, src_element = GST_PAD_PARENT (srcpad); sink_element = GST_PAD_PARENT (sinkpad); + GST_OPT_LOCK (sched); get_group (src_element, &group1); get_group (sink_element, &group2); @@ -2530,7 +2667,7 @@ gst_opt_scheduler_pad_unlink (GstScheduler * sched, if (!group1 || !group2) { GST_LOG ("one (or both) of the elements is not in a group, not interesting"); - return; + goto done; } /* easy part, groups are different */ @@ -2614,6 +2751,8 @@ gst_opt_scheduler_pad_unlink (GstScheduler * sched, } /* at this point the group can be freed and gone, so don't touch */ } +done: + GST_OPT_UNLOCK (sched); } /* a scheduler iteration is done by looping and scheduling the active chains */ @@ -2622,7 +2761,10 @@ gst_opt_scheduler_iterate (GstScheduler * sched) { GstSchedulerState state = GST_SCHEDULER_STATE_STOPPED; GstOptScheduler *osched = GST_OPT_SCHEDULER (sched); - gint iterations = osched->iterations; + gint iterations; + + GST_OPT_LOCK (sched); + iterations = osched->iterations; osched->state = GST_OPT_SCHEDULER_STATE_RUNNING; @@ -2632,7 +2774,8 @@ gst_opt_scheduler_iterate (GstScheduler * sched) gboolean scheduled = FALSE; GSList *chains; - /* we have to schedule each of the scheduler chains now */ + /* we have to schedule each of the scheduler chains now. + * FIXME handle the case where the chains change during iterations. */ chains = osched->chains; while (chains) { GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data; @@ -2641,8 +2784,7 @@ gst_opt_scheduler_iterate (GstScheduler * sched) /* if the chain is not disabled, schedule it */ if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) { GST_LOG ("scheduling chain %p", chain); - schedule_chain (chain); - scheduled = TRUE; + scheduled = schedule_chain (chain); GST_LOG ("scheduled chain %p", chain); } else { GST_LOG ("not scheduling disabled chain %p", chain); @@ -2681,6 +2823,7 @@ gst_opt_scheduler_iterate (GstScheduler * sched) if (iterations > 0) iterations--; } + GST_OPT_UNLOCK (sched); return state; } @@ -2692,6 +2835,8 @@ gst_opt_scheduler_show (GstScheduler * sched) GstOptScheduler *osched = GST_OPT_SCHEDULER (sched); GSList *chains; + GST_OPT_LOCK (sched); + g_print ("iterations: %d\n", osched->iterations); g_print ("max recursion: %d\n", osched->max_recursion); @@ -2739,6 +2884,7 @@ gst_opt_scheduler_show (GstScheduler * sched) } } } + GST_OPT_UNLOCK (sched); } static void -- 2.7.4