cothread_context *context;
gboolean use_cothreads;
- GList *elements;
- GList *chains;
+ GSList *elements;
+ GSList *chains;
};
struct _GstOptSchedulerClass {
GstOptSchedulerChainFlags flags;
- GList *groups; /* the groups in this chain */
+ GSList *groups; /* the groups in this chain */
gint num_groups;
gint num_enabled;
};
GstOptSchedulerGroupFlags flags; /* flags for this group */
GstOptSchedulerGroupType type; /* flags for this group */
- GList *elements; /* elements of this group */
+ GSList *elements; /* elements of this group */
gint num_elements;
gint num_enabled;
GstElement *entry; /* the group's entry point */
g_assert (chain->sched == osched);
- osched->chains = g_list_remove (osched->chains, chain);
+ osched->chains = g_slist_remove (osched->chains, chain);
- g_list_free (chain->groups);
+ g_slist_free (chain->groups);
g_free (chain);
}
g_assert (group->chain == NULL);
- chain->groups = g_list_prepend (chain->groups, group);
+ chain->groups = g_slist_prepend (chain->groups, group);
group->chain = chain;
chain->num_groups++;
}
chain = g_new0 (GstOptSchedulerChain, 1);
chain->sched = osched;
- osched->chains = g_list_prepend (osched->chains, chain);
+ osched->chains = g_slist_prepend (osched->chains, chain);
GST_INFO (GST_CAT_SCHEDULING, "new chain %p", chain);
g_assert (group->chain == chain);
- chain->groups = g_list_remove (chain->groups, group);
+ chain->groups = g_slist_remove (chain->groups, group);
chain->num_groups--;
group->chain = NULL;
static void
merge_chains (GstOptSchedulerChain *chain1, GstOptSchedulerChain *chain2)
{
- GList *walk;
+ GSList *walk;
GST_INFO (GST_CAT_SCHEDULING, "mergin chain %p and %p", chain1, chain2);
group->chain = NULL;
add_to_chain (chain1, group);
- walk = g_list_next (walk);
+ walk = g_slist_next (walk);
}
delete_chain (chain2->sched, chain2);
}
g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);
- group->elements = g_list_prepend (group->elements, element);
+ group->elements = g_slist_prepend (group->elements, element);
group->num_elements++;
GST_ELEMENT_SCHED_GROUP (element) = group;
if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
destroy_group_scheduler (group);
- g_list_free (group->elements);
+ g_slist_free (group->elements);
g_free (group);
}
static void
merge_groups (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
{
- GList *walk;
+ GSList *walk;
GstOptSchedulerChain *chain1;
GstOptSchedulerChain *chain2;
walk = group2->elements;
while (walk) {
add_to_group (group1, (GstElement *)walk->data);
- walk = g_list_next (walk);
+ walk = g_slist_next (walk);
}
chain1 = group1->chain;
{
GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" to group %p", GST_ELEMENT_NAME (element), group);
- group->elements = g_list_remove (group->elements, element);
+ group->elements = g_slist_remove (group->elements, element);
group->num_elements--;
GST_ELEMENT_SCHED_GROUP (element) = NULL;
return 1;
}
else {
- if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING))
- return group->schedulefunc (group->argc, group->argv);
- else
- return 0;
+ /* in the no cothread case, we cannot schedule already running groups */
+ if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
+ group->schedulefunc (group->argc, group->argv);
+ return 1;
+ }
}
+ return 0;
}
static void
schedule_chain (GstOptSchedulerChain *chain)
{
- GList *groups = chain->groups;
+ GSList *groups = chain->groups;
while (groups) {
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
- groups = g_list_next (groups);
+ groups = g_slist_next (groups);
if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
GST_INFO (GST_CAT_SCHEDULING, "scheduling group %p in chain %p",
group, chain);
schedule_group (group);
-
break;
}
}
}
static int
-wrapper_function (int argc, char *argv[])
+get_wrapper (int argc, char *argv[])
{
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
+ const GList *pads = gst_element_get_pad_list (group->entry);
- GST_INFO (GST_CAT_SCHEDULING, "wrapper function of group %p", group);
+ GST_INFO (GST_CAT_SCHEDULING, "get wrapper of group %p", group);
group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
- switch (group->type) {
- case GST_OPT_SCHEDULER_GROUP_GET:
- {
- const GList *pads = gst_element_get_pad_list (group->entry);
- GstBuffer *buffer;
+ while (pads) {
+ GstBuffer *buffer;
+ GstPad *pad = GST_PAD_CAST (pads->data);
+ pads = g_list_next (pads);
- while (pads) {
- GstPad *pad = GST_PAD_CAST (pads->data);
- pads = g_list_next (pads);
+ /* skip sinks and ghostpads */
+ if (!GST_PAD_IS_SRC (pad) || !GST_IS_REAL_PAD (pad))
+ continue;
- /* skip sinks and ghostpads */
- if (!GST_PAD_IS_SRC (pad) || !GST_IS_REAL_PAD (pad))
- continue;
+ GST_INFO (GST_CAT_SCHEDULING, "doing get and push on pad \"%s:%s\" in group %p",
+ GST_DEBUG_PAD_NAME (pad), group);
- GST_INFO (GST_CAT_SCHEDULING, "doing get and push on pad \"%s:%s\" in group %p",
- GST_DEBUG_PAD_NAME (pad), group);
+ buffer = GST_RPAD_GETFUNC (pad) (pad);
+ if (buffer)
+ gst_pad_push (pad, buffer);
+ }
- buffer = GST_RPAD_GETFUNC (pad) (pad);
- if (buffer)
- gst_pad_push (pad, buffer);
- }
- break;
- }
- case GST_OPT_SCHEDULER_GROUP_LOOP:
- {
- GstElement *entry = group->entry;
+ group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
- GST_INFO (GST_CAT_SCHEDULING, "calling loopfunc of element %s in group %p",
- GST_ELEMENT_NAME (entry), group);
+ return 0;
+}
- entry->loopfunc (entry);
- break;
- }
- default:
- g_warning ("(internal error) unkown group type %d, disabling\n", group->type);
- chain_group_set_enabled (group->chain, group, FALSE);
- group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
- break;
- }
+static int
+loop_wrapper (int argc, char *argv[])
+{
+ GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
+ GstElement *entry = group->entry;
+
+ GST_INFO (GST_CAT_SCHEDULING, "loop wrapper of group %p", group);
+
+ group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
+
+ GST_INFO (GST_CAT_SCHEDULING, "calling loopfunc of element %s in group %p",
+ GST_ELEMENT_NAME (entry), group);
+
+ entry->loopfunc (entry);
group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
return 0;
+
+}
+
+static int
+unkown_wrapper (int argc, char *argv[])
+{
+ GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
+
+ g_warning ("(internal error) unkown group type %d, disabling\n", group->type);
+ chain_group_set_enabled (group->chain, group, FALSE);
+ group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
+
+ return 0;
}
static void
group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
+ if (GST_RPAD_BUFPEN (GST_RPAD_PEER (sinkpad))) {
+ g_warning ("scheduling error, bufpen not empty, disabling group %p", group);
+ chain_group_set_enabled (group->chain, group, FALSE);
+ group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
+ return;
+ }
+
GST_RPAD_BUFPEN (GST_RPAD_PEER (sinkpad)) = buffer;
while (GST_RPAD_BUFPEN (GST_RPAD_PEER (sinkpad))) {
group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad));
- schedule_group (group);
+ if (!schedule_group (group)) {
+ g_warning ("deadlock detected, disabling group %p", group);
+ chain_group_set_enabled (group->chain, group, FALSE);
+ group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
+ return NULL;
+ }
buffer = GST_RPAD_BUFPEN (srcpad);
}
else {
GST_RPAD_CHAINFUNC (sinkpad) (sinkpad, buffer);
}
-
}
static void
setup_group_scheduler (GstOptScheduler *osched, GstOptSchedulerGroup *group)
{
+ GroupScheduleFunction wrapper;
+
+ wrapper = unkown_wrapper;
+
+ if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
+ wrapper = get_wrapper;
+ else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
+ wrapper = loop_wrapper;
+
if (osched->use_cothreads) {
- do_cothread_create (group->cothread, osched->context,
- wrapper_function, 0, (char **) group);
+ if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
+ do_cothread_create (group->cothread, osched->context,
+ wrapper, 0, (char **) group);
+ }
+ else {
+ do_cothread_setfunc (group->cothread, osched->context,
+ wrapper, 0, (char **) group);
+ }
}
else {
- group->schedulefunc = wrapper_function;
+ group->schedulefunc = wrapper;
group->argc = 0;
group->argv = (char **) group;
}
-
group->flags |= GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
}
switch (transition) {
case GST_STATE_PAUSED_TO_PLAYING:
- if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
- setup_group_scheduler (osched, group);
- }
+ setup_group_scheduler (osched, group);
group_element_set_enabled (group, element, TRUE);
break;
case GST_STATE_PLAYING_TO_PAUSED:
//GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
}
+/*
+ * the idea is to put the two elements into the same group.
+ * - When no element is inside a group, we create a new group and add
+ * the elements to it.
+ * - When one of the elements has a group, add the other element to
+ * that group
+ * - if both of the elements have a group, we merge the groups, which
+ * will also merge the chains.
+ */
static GstOptSchedulerGroup*
group_elements (GstOptScheduler *osched, GstElement *element1, GstElement *element2)
{
if (ctx2)
group2 = ctx2->group;
+ /* none of the elements is added to a group, create a new group
+ * and chain to add the elements to */
if (!group1 && !group2) {
GstOptSchedulerChain *chain;
group = create_group (chain, element1);
add_to_group (group, element2);
}
+ /* the first element has a group */
else if (group1) {
GST_INFO (GST_CAT_SCHEDULING, "adding \"%s\" to \"%s\"'s group",
GST_ELEMENT_NAME (element2), GST_ELEMENT_NAME (element1));
+
+ /* the second element also has a group, merge */
if (group2)
merge_groups (group1, group2);
+ /* the second element has no group, add it to the group
+ * of the first element */
else
add_to_group (group1, element2);
group = group1;
}
+ /* element1 has no group, element2 does. Add element1 to the
+ * group of element2 */
else {
GST_INFO (GST_CAT_SCHEDULING, "adding \"%s\" to \"%s\"'s group",
GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
element1 = GST_PAD_PARENT (srcpad);
element2 = GST_PAD_PARENT (sinkpad);
+ /* first we need to figure out whar type of connection we're dealing
+ * with */
if (element1->loopfunc && element2->loopfunc)
type = GST_OPT_LOOP_TO_LOOP;
else {
}
}
+ /* for each connection type, perform specific actions */
switch (type) {
case GST_OPT_GET_TO_CHAIN:
{
GST_INFO (GST_CAT_SCHEDULING, "get to chain based connection");
+ /* setup get/chain handlers */
GST_RPAD_GETHANDLER (srcpad) = GST_RPAD_GETFUNC (srcpad);
if (GST_ELEMENT_IS_EVENT_AWARE (element2))
GST_RPAD_CHAINHANDLER (sinkpad) = GST_RPAD_CHAINFUNC (sinkpad);
else
GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
+ /* the two elements should be put into the same group,
+ * this also means that they are in the same chain automatically */
group = group_elements (osched, element1, element2);
+ /* if there is not yet an entry in the group, select the source
+ * element as the entry point */
if (!group->entry) {
group->entry = element1;
group->type = GST_OPT_SCHEDULER_GROUP_GET;
else
GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
+ /* the two elements should be put into the same group,
+ * this also means that they are in the same chain automatically,
+ * in case of a loop-based element1, there will be a group for element1 and
+ * element2 will be added to it. */
group_elements (osched, element1, element2);
break;
case GST_OPT_GET_TO_LOOP:
GST_INFO (GST_CAT_SCHEDULING, "get to loop based connection");
GST_RPAD_GETHANDLER (srcpad) = GST_RPAD_GETFUNC (srcpad);
+
+ /* the two elements should be put into the same group,
+ * this also means that they are in the same chain automatically,
+ * element2 is loop-based so it already has a group where element1
+ * will be added to */
group_elements (osched, element1, element2);
break;
case GST_OPT_CHAIN_TO_LOOP:
group1 = GST_ELEMENT_SCHED_GROUP (element1);
group2 = GST_ELEMENT_SCHED_GROUP (element2);
+ /* group2 is guaranteed to exist as it contains a loop-based element.
+ * group1 only exists if element1 is connected to some other element */
if (!group1) {
+ /* create a new group for element1 as it cannot be merged into another group
+ * here. we create the group in the same chain as the loop-based element. */
GST_INFO (GST_CAT_SCHEDULING, "creating new group for element %s", GST_ELEMENT_NAME (element1));
group1 = create_group (group2->chain, element1);
}
else {
+ /* both elements are already in a group, make sure they are added to
+ * the same chain */
merge_chains (group1->chain, group2->chain);
}
break;
{
GstSchedulerState state;
GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
- GList *chains;
+ GSList *chains;
gboolean scheduled = FALSE;
osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
chains = osched->chains;
while (chains) {
GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
- chains = g_list_next (chains);
+ chains = g_slist_next (chains);
if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
schedule_chain (chain);
static void
gst_opt_scheduler_show (GstScheduler *sched)
{
- //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+ GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+ GSList *chains;
+
+ chains = osched->chains;
+ while (chains) {
+ GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
+ GSList *groups = chain->groups;
+ chains = g_slist_next (chains);
+
+ g_print ("+- chain %p: %d groups, %d enabled, flags %d\n", chain, chain->num_groups, chain->num_enabled, chain->flags);
+
+ while (groups) {
+ GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
+ GSList *elements = group->elements;
+ groups = g_slist_next (groups);
+
+ g_print (" +- group %p: %d elements, %d enabled, flags %d, entry %s, %s\n",
+ group, group->num_elements, group->num_enabled, group->flags,
+ (group->entry ? GST_ELEMENT_NAME (group->entry): "(none)"),
+ (group->type == GST_OPT_SCHEDULER_GROUP_GET ? "get-based" : "loop-based") );
+
+ while (elements) {
+ GstElement *element = (GstElement *) elements->data;
+ elements = g_slist_next (elements);
+
+ g_print (" +- element %s\n", GST_ELEMENT_NAME (element));
+ }
+ }
+ }
}