More updates to the non-cothread scheduler
authorWim Taymans <wim.taymans@gmail.com>
Thu, 12 Sep 2002 19:22:03 +0000 (19:22 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 12 Sep 2002 19:22:03 +0000 (19:22 +0000)
Original commit message from CVS:
More updates to the non-cothread scheduler

gst/schedulers/gstoptimalscheduler.c

index 70121b7..b371c4e 100644 (file)
@@ -28,6 +28,7 @@
 
 #define GST_ELEMENT_SCHED_CONTEXT(elem)                ((GstOptSchedulerCtx*) (GST_ELEMENT_CAST (elem)->sched_private))
 #define GST_ELEMENT_SCHED_GROUP(elem)          (GST_ELEMENT_SCHED_CONTEXT (elem)->group)
+#define GST_PAD_BUFLIST(pad)                   ((GList*) (GST_REAL_PAD_CAST(pad)->sched_private))
 
 #define GST_ELEMENT_COTHREAD_STOPPING                  GST_ELEMENT_SCHEDULER_PRIVATE1
 #define GST_ELEMENT_IS_COTHREAD_STOPPING(element)      GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
@@ -58,15 +59,19 @@ typedef enum {
 } GstOptSchedulerState;
 
 struct _GstOptScheduler {
-  GstScheduler parent;
+  GstScheduler                  parent;
 
-  GstOptSchedulerState state;
+  GstOptSchedulerState  state;
 
-  cothread_context *context;
-  gboolean use_cothreads;
+  cothread_context     *context;
+  gboolean              use_cothreads;
+  gint                          iterations;
 
-  GSList *elements;
-  GSList *chains;
+  GSList               *elements;
+  GSList               *chains;
+
+  GList                        *runqueue;
+  gint                  recursion;
 };
 
 struct _GstOptSchedulerClass {
@@ -154,10 +159,22 @@ struct _GstOptSchedulerCtx {
   gint element_type;
 };
 
+enum
+{
+  ARG_0,
+  ARG_USE_COTHREADS,
+  ARG_ITERATIONS,
+};
 
 static void            gst_opt_scheduler_class_init            (GstOptSchedulerClass *klass);
 static void            gst_opt_scheduler_init                  (GstOptScheduler *scheduler);
 
+static void            gst_opt_scheduler_set_property          (GObject *object, guint prop_id,
+                                                                const GValue *value, GParamSpec *pspec);
+static void            gst_opt_scheduler_get_property          (GObject *object, guint prop_id,
+                                                                GValue *value, GParamSpec *pspec);
+
 static void            gst_opt_scheduler_dispose               (GObject *object);
 
 static void            gst_opt_scheduler_setup                 (GstScheduler *sched);
@@ -218,8 +235,17 @@ gst_opt_scheduler_class_init (GstOptSchedulerClass *klass)
 
   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
 
+  gobject_class->set_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_set_property);
+  gobject_class->get_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property);
   gobject_class->dispose       = GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose);
 
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_USE_COTHREADS,
+    g_param_spec_boolean ("use_cothreads", "Use cothreads", "Should this scheduler use cothreads",
+                          TRUE, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_USE_COTHREADS,
+    g_param_spec_int ("iterations", "Iterations", "Number of groups to schedule in one iteration (-1 == until EOS/error)",
+                      -1, G_MAXINT, 1, G_PARAM_READWRITE));
+
   gstscheduler_class->setup             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_setup);
   gstscheduler_class->reset             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_reset);
   gstscheduler_class->add_element      = GST_DEBUG_FUNCPTR (gst_opt_scheduler_add_element);
@@ -242,8 +268,9 @@ static void
 gst_opt_scheduler_init (GstOptScheduler *scheduler)
 {
   scheduler->elements = NULL;
-  scheduler->use_cothreads = FALSE;
+  //scheduler->use_cothreads = FALSE;
   scheduler->use_cothreads = TRUE;
+  scheduler->iterations = 1;
 }
 
 static void
@@ -279,31 +306,6 @@ GstPluginDesc plugin_desc = {
   plugin_init
 };
 
-/*
- * Entry points for this scheduler.
- */
-static void
-gst_opt_scheduler_setup (GstScheduler *sched)
-{   
-  GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
-             
-  /* first create thread context */
-  if (osched->context == NULL && osched->use_cothreads) {
-    GST_DEBUG (GST_CAT_SCHEDULING, "initializing cothread context");
-    osched->context = do_cothread_context_init ();
-  }
-} 
-  
-static void 
-gst_opt_scheduler_reset (GstScheduler *sched)
-{ 
-  GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
-             
-  if (osched->context && osched->use_cothreads) {
-    do_cothread_context_destroy (osched->context);
-    osched->context = NULL; 
-  }
-}     
 
 static void
 delete_chain (GstOptScheduler *osched, GstOptSchedulerChain *chain)
@@ -506,6 +508,9 @@ remove_from_group (GstOptSchedulerGroup *group, GstElement *element)
 }
 */
 
+/* this function enables/disables an element, it will set/clear a flag on the element 
+ * and tells the chain that the group is enabled if all elements inside the group are
+ * enabled */
 static void
 group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gboolean enabled)
 {
@@ -531,86 +536,91 @@ group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gbo
   }
 }
 
-static int 
+/* a group is scheduled by doing a cothread switch to it or
+ * by calling the schedule function. In the non-cothread case
+ * we cannot run already running groups so we return FALSE here
+ * to indicate this to the caller */
+static gboolean 
 schedule_group (GstOptSchedulerGroup *group) 
 {
   if (group->chain->sched->use_cothreads) {
     if (group->cothread)
       do_cothread_switch (group->cothread);
-    return 1;
+    return TRUE;
   }
   else {
-    /* in the no cothread case, we cannot schedule already running groups */
-    if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
-      group->schedulefunc (group->argc, group->argv);
-      return 1;
-    }
+    group->schedulefunc (group->argc, group->argv);
+    return TRUE;
   }
-  return 0;
+  return FALSE;
 }
 
-static void 
-schedule_chain (GstOptSchedulerChain *chain) 
+static void
+gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
 {
-  GSList *groups = chain->groups;
+  GST_INFO (GST_CAT_SCHEDULING, "entering scheduler run queue recursion %d", osched->recursion);
 
-  while (groups) {
-    GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
-    groups = g_slist_next (groups);
+  osched->recursion++;
 
-    if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
-      GST_INFO (GST_CAT_SCHEDULING, "scheduling group %p in chain %p", 
-               group, chain);
+  while (osched->runqueue) {
+    GstOptSchedulerGroup *group;
+    
+    group = (GstOptSchedulerGroup *) osched->runqueue->data;
+    osched->runqueue = g_list_remove (osched->runqueue, group);
 
-      schedule_group (group);
-      break;
-    }
-  }
-}
+    GST_INFO (GST_CAT_SCHEDULING, "scheduling %p", group);
 
-static void
-gst_opt_scheduler_add_element (GstScheduler *sched, GstElement *element)
-{
-  GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
-  GstOptSchedulerCtx *ctx;
+    schedule_group (group);
 
-  GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to scheduler", GST_ELEMENT_NAME (element));
+    GST_INFO (GST_CAT_SCHEDULING, "done scheduling %p", group);
+  }
 
-  if (GST_ELEMENT_IS_DECOUPLED (element))
-    return;
+  GST_INFO (GST_CAT_SCHEDULING, "run queue length after scheduling %d", g_list_length (osched->runqueue));
 
-  ctx = g_new0 (GstOptSchedulerCtx, 1);
-  GST_ELEMENT_SCHED_CONTEXT (element) = ctx;
+  osched->recursion--;
+}
 
-  if (element->loopfunc) {
-    GstOptSchedulerGroup *group;
-    GstOptSchedulerChain *chain;
+/* a chain is scheduled by picking the first active group and scheduling it */
+static void 
+schedule_chain (GstOptSchedulerChain *chain) 
+{
+  GSList *groups = chain->groups;
 
-    chain = create_chain (osched);
+  while (groups) {
+    GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
 
-    group = create_group (chain, element);
-    group->entry = element;
-    group->type = GST_OPT_SCHEDULER_GROUP_LOOP;
+    groups = g_slist_next (groups);
 
-    GST_INFO (GST_CAT_SCHEDULING, "added element \"%s\" as loop based entry", GST_ELEMENT_NAME (element));
-  }
-}
+    if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
+      GstOptScheduler *osched;
 
-static void
-gst_opt_scheduler_remove_element (GstScheduler *sched, GstElement *element)
-{
-  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+      osched = chain->sched;
 
-  GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler", GST_ELEMENT_NAME (element));
+      GST_INFO (GST_CAT_SCHEDULING, "scheduling group %p in chain %p", 
+               group, chain);
 
-  g_free (GST_ELEMENT_SCHED_CONTEXT (element));
-  GST_ELEMENT_SCHED_CONTEXT (element) = NULL;
+      if (osched->use_cothreads) {
+       schedule_group (group);
+      }
+      else {
+        osched->recursion = 0;
+        osched->runqueue = g_list_append (osched->runqueue, group);
+        gst_opt_scheduler_schedule_run_queue (osched);
+      }
 
-  g_warning ("remove implement me");
+      GST_INFO (GST_CAT_SCHEDULING, "done scheduling group %p in chain %p", 
+               group, chain);
+      break;
+    }
+  }
 }
 
+/* a get-based group is scheduled by getting a buffer from the get based
+ * entry point and by pushing the buffer to the peer.
+ * We also set the running flag on this group for as long as this
+ * function is running. */
 static int
-get_wrapper (int argc, char *argv[])
+get_group_schedule_function (int argc, char *argv[])
 {
   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
   const GList *pads = gst_element_get_pad_list (group->entry);
@@ -641,8 +651,12 @@ get_wrapper (int argc, char *argv[])
   return 0;
 }
 
+/* a loop-based group is scheduled by calling the loop function
+ * on the entry point. 
+ * We also set the running flag on this group for as long as this
+ * function is running. */
 static int
-loop_wrapper (int argc, char *argv[])
+loop_group_schedule_function (int argc, char *argv[])
 {
   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
   GstElement *entry = group->entry;
@@ -662,8 +676,9 @@ loop_wrapper (int argc, char *argv[])
 
 }
 
+/* the function to schedule an unkown group, which just gives an error */
 static int
-unkown_wrapper (int argc, char *argv[])
+unkown_group_schedule_function (int argc, char *argv[])
 {
   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
 
@@ -674,59 +689,89 @@ unkown_wrapper (int argc, char *argv[])
   return 0;
 }
 
+/* this function is called when the first element of a chain-loop or a loop-loop
+ * connection performs a push to the loop element. We then schedule the
+ * group with the loop-based element until the bufpen is empty */
 static void
 gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstBuffer *buffer)
 {
   GstOptSchedulerGroup *group;
+  GstOptScheduler *osched;
 
   GST_INFO (GST_CAT_SCHEDULING, "loop wrapper, putting buffer in bufpen");
 
   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
+  osched = group->chain->sched;
 
-  if (GST_RPAD_BUFPEN (GST_RPAD_PEER (sinkpad))) {
-    g_warning ("scheduling error, bufpen not empty, disabling group %p", group);
-    chain_group_set_enabled (group->chain, group, FALSE);
-    group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
-    return;
-  }
-
-  GST_RPAD_BUFPEN (GST_RPAD_PEER (sinkpad)) = buffer;
 
-  while (GST_RPAD_BUFPEN (GST_RPAD_PEER (sinkpad))) {
-    if (!schedule_group (group))
-      break;
+  if (osched->use_cothreads) {
+    if (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))) {
+      g_warning ("deadlock detected, disabling group %p", group);
+      chain_group_set_enabled (group->chain, group, FALSE);
+      group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
+    }
+    else {
+      GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), buffer);
+      schedule_group (group);
+    }
   }
+  else if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
+    GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), buffer);
+    osched->runqueue = g_list_append (osched->runqueue, group);
+  }
+  
+  GST_INFO (GST_CAT_SCHEDULING, "after loop wrapper buflist %d", 
+           g_list_length (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))));
 }
 
+/* this function is called by a loop based element that performs a
+ * pull on a sinkpad. We schedule the peer group until the bufpen
+ * is filled with the buffer so that this function  can return */
 static GstBuffer*
 gst_opt_scheduler_get_wrapper (GstPad *srcpad)
 {
-  GstBuffer *buffer;
+  GstBuffer *buffer = NULL;
 
   GST_INFO (GST_CAT_SCHEDULING, "get wrapper, removing buffer from bufpen");
 
-  buffer = GST_RPAD_BUFPEN (srcpad);
+  if (GST_PAD_BUFLIST (srcpad))
+    buffer = GST_PAD_BUFLIST (srcpad)->data;
 
   while (!buffer) {
     GstOptSchedulerGroup *group;
+    GstOptScheduler *osched;
     
     group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad));
+    osched = group->chain->sched;
 
-    if (!schedule_group (group)) {
+    if (osched->use_cothreads) {
+      schedule_group (group);
+    }
+    else if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
+      osched->runqueue = g_list_append (osched->runqueue, group);
+      gst_opt_scheduler_schedule_run_queue (osched);
+    }
+    else {
       g_warning ("deadlock detected, disabling group %p", group);
       chain_group_set_enabled (group->chain, group, FALSE);
       group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
       return NULL;
     }
     
-    buffer = GST_RPAD_BUFPEN (srcpad);
+    if (GST_PAD_BUFLIST (srcpad)) {
+      buffer = (GstBuffer *) GST_PAD_BUFLIST (srcpad)->data;
+    }
   }
+  GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), buffer);
 
-  GST_RPAD_BUFPEN (srcpad) = NULL;
+  GST_INFO (GST_CAT_SCHEDULING, "get wrapper, returning buffer %d",
+           g_list_length (GST_PAD_BUFLIST (srcpad)));
 
   return buffer;
 }
 
+/* this function is a chain wrapper for non-event-aware plugins,
+ * it'll simply dispatch the events to the (default) event handler */
 static void
 gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstBuffer *buffer)
 {
@@ -738,17 +783,21 @@ gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstBuffer *buffer)
   }
 }
 
+/* setup the scheduler context for a group. The right schedule function
+ * is selected based on the group type and cothreads are created if 
+ * needed */
 static void 
 setup_group_scheduler (GstOptScheduler *osched, GstOptSchedulerGroup *group) 
 {
   GroupScheduleFunction wrapper;
 
-  wrapper = unkown_wrapper;
+  wrapper = unkown_group_schedule_function;
 
+  /* figure out the wrapper function for this group */
   if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
-    wrapper = get_wrapper;
+    wrapper = get_group_schedule_function;
   else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
-    wrapper = loop_wrapper;
+    wrapper = loop_group_schedule_function;
        
   if (osched->use_cothreads) {
     if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
@@ -773,9 +822,11 @@ gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gi
 {
   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
   GstOptSchedulerGroup *group;
+  GstElementStateReturn res = GST_STATE_SUCCESS;
   
   GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" state change %d", GST_ELEMENT_NAME (element), transition);
 
+  /* we check the state of the managing pipeline here */
   if (GST_IS_BIN (element)) {
     if (GST_SCHEDULER_PARENT (sched) == element) {
       GST_INFO (GST_CAT_SCHEDULING, "parent \"%s\" changed state", GST_ELEMENT_NAME (element));
@@ -793,58 +844,38 @@ gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gi
           GST_INFO (GST_CAT_SCHEDULING, "no interesting state change, doing nothing");
       }
     }
-    return GST_STATE_SUCCESS;
+    return res;
   }
 
+  /* we don't care about decoupled elements after this */
   if (GST_ELEMENT_IS_DECOUPLED (element))
     return GST_STATE_SUCCESS;
 
+  /* get the group of the element */
   group = GST_ELEMENT_SCHED_GROUP (element);
 
   switch (transition) {
     case GST_STATE_PAUSED_TO_PLAYING:
-      setup_group_scheduler (osched, group);
-      group_element_set_enabled (group, element, TRUE);
+      /* an element withut a group has to be an unconnected src, sink
+       * filter element */
+      if (!group)
+       res = GST_STATE_FAILURE;
+      /* else construct the scheduling context of this group and enable it */
+      else {
+        setup_group_scheduler (osched, group);
+        group_element_set_enabled (group, element, TRUE);
+      }
       break;
     case GST_STATE_PLAYING_TO_PAUSED:
-      group_element_set_enabled (group, element, FALSE);
+      /* if the element still has a group, we disable it */
+      if (group) 
+        group_element_set_enabled (group, element, FALSE);
       break;
     default:
       break;
   }
 
-  return GST_STATE_SUCCESS;
-}
-
-static void
-gst_opt_scheduler_lock_element (GstScheduler *sched, GstElement *element)
-{
-  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
-}
-
-static void
-gst_opt_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
-{
-  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
-}
-
-static void
-gst_opt_scheduler_yield (GstScheduler *sched, GstElement *element)
-{
-  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
-}
-
-static gboolean
-gst_opt_scheduler_interrupt (GstScheduler *sched, GstElement *element)
-{
-  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
-  return TRUE;
-}
-
-static void
-gst_opt_scheduler_error (GstScheduler *sched, GstElement *element)
-{
-  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+  return res;
 }
 
 /*
@@ -917,6 +948,114 @@ typedef enum {
   GST_OPT_LOOP_TO_LOOP,
 } ConnectionType;
 
+/*
+ * Entry points for this scheduler.
+ */
+static void
+gst_opt_scheduler_setup (GstScheduler *sched)
+{   
+  GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+             
+  /* first create thread context */
+  if (osched->context == NULL && osched->use_cothreads) {
+    GST_DEBUG (GST_CAT_SCHEDULING, "initializing cothread context");
+    osched->context = do_cothread_context_init ();
+  }
+} 
+  
+static void 
+gst_opt_scheduler_reset (GstScheduler *sched)
+{ 
+  GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+             
+  if (osched->context && osched->use_cothreads) {
+    do_cothread_context_destroy (osched->context);
+    osched->context = NULL; 
+  }
+}     
+static void
+gst_opt_scheduler_add_element (GstScheduler *sched, GstElement *element)
+{
+  GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+  GstOptSchedulerCtx *ctx;
+
+  GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to scheduler", GST_ELEMENT_NAME (element));
+
+  /* decoupled elements are not added to the scheduler lists */
+  if (GST_ELEMENT_IS_DECOUPLED (element))
+    return;
+
+  ctx = g_new0 (GstOptSchedulerCtx, 1);
+  GST_ELEMENT_SCHED_CONTEXT (element) = ctx;
+
+  /* loop based elements *always* end up in their own group. It can eventually
+   * be merged with another group when a connection is made */
+  if (element->loopfunc) {
+    GstOptSchedulerGroup *group;
+    GstOptSchedulerChain *chain;
+
+    chain = create_chain (osched);
+
+    group = create_group (chain, element);
+    group->entry = element;
+    group->type = GST_OPT_SCHEDULER_GROUP_LOOP;
+
+    GST_INFO (GST_CAT_SCHEDULING, "added element \"%s\" as loop based entry", GST_ELEMENT_NAME (element));
+  }
+}
+
+static void
+gst_opt_scheduler_remove_element (GstScheduler *sched, GstElement *element)
+{
+  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+
+  GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler", GST_ELEMENT_NAME (element));
+
+  g_free (GST_ELEMENT_SCHED_CONTEXT (element));
+  GST_ELEMENT_SCHED_CONTEXT (element) = NULL;
+
+  g_warning ("remove implement me");
+}
+
+static void
+gst_opt_scheduler_lock_element (GstScheduler *sched, GstElement *element)
+{
+  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+  g_warning ("lock element, implement me");
+}
+
+static void
+gst_opt_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
+{
+  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+  g_warning ("unlock element, implement me");
+}
+
+static void
+gst_opt_scheduler_yield (GstScheduler *sched, GstElement *element)
+{
+  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+}
+
+static gboolean
+gst_opt_scheduler_interrupt (GstScheduler *sched, GstElement *element)
+{
+  //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+  
+  g_warning ("interrupt element, implement me");
+
+  return TRUE;
+}
+
+static void
+gst_opt_scheduler_error (GstScheduler *sched, GstElement *element)
+{
+  GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+
+  osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
+}
+
+/* connect pads, merge groups and chains */
 static void
 gst_opt_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
 {
@@ -1056,6 +1195,9 @@ static GstPad*
 gst_opt_scheduler_pad_select (GstScheduler *sched, GList *padlist)
 {
   //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
+  
+  g_warning ("pad select, implement me");
+
   return NULL;
 }
 
@@ -1066,35 +1208,51 @@ gst_opt_scheduler_clock_wait (GstScheduler *sched, GstElement *element,
   return gst_clock_wait (clock, time, jitter);
 }
 
+/* a scheduler iteration is done by looping and scheduling the active chains */
 static GstSchedulerState
 gst_opt_scheduler_iterate (GstScheduler *sched)
 {
-  GstSchedulerState state;
+  GstSchedulerState state = GST_SCHEDULER_STATE_STOPPED;
   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
-  GSList *chains;
-  gboolean scheduled = FALSE;
+  gint iterations = osched->iterations;
 
   osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
 
-  chains = osched->chains;
-  while (chains) {
-    GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
-    chains = g_slist_next (chains);
+  while (iterations) {
+    gboolean scheduled = FALSE;
+    GSList *chains;
+
+    /* we have to schedule each of the scheduler chains now */
+    chains = osched->chains;
+    while (chains) {
+      GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
+      chains = g_slist_next (chains);
 
-    if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
-      schedule_chain (chain);
-      scheduled = TRUE;
+      /* if the chain is not disabled, schedule it */
+      if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
+        schedule_chain (chain);
+        scheduled = TRUE;
+      }
     }
-  }
 
-  if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
-    state = GST_SCHEDULER_STATE_ERROR;
-  }
-  else {
-    if (scheduled)
-      state = GST_SCHEDULER_STATE (sched);
-    else
-      state = GST_SCHEDULER_STATE_STOPPED;
+    /* at this point it's possible that the scheduler state is
+     * in error, we then return an error */
+    if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
+      state = GST_SCHEDULER_STATE_ERROR;
+      break;
+    }
+    else {
+      /* if chains were scheduled, return our current state */
+      if (scheduled)
+        state = GST_SCHEDULER_STATE (sched);
+      /* if no chains were scheduled, we say we are stopped */
+      else {
+        state = GST_SCHEDULER_STATE_STOPPED;
+        break;
+      }
+    }
+    if (iterations > 0)
+      iterations--;
   }
 
   return state;
@@ -1134,3 +1292,50 @@ gst_opt_scheduler_show (GstScheduler *sched)
     }
   }
 }
+
+static void
+gst_opt_scheduler_get_property (GObject *object, guint prop_id,
+                                GValue *value, GParamSpec *pspec)
+{
+  GstOptScheduler *osched;
+  
+  g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
+
+  osched = GST_OPT_SCHEDULER_CAST (object);
+
+  switch (prop_id) {
+    case ARG_USE_COTHREADS:
+      g_value_set_boolean (value, osched->use_cothreads);
+      break;
+    case ARG_ITERATIONS:
+      g_value_set_int (value, osched->iterations);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_opt_scheduler_set_property (GObject *object, guint prop_id,
+                               const GValue *value, GParamSpec *pspec)
+{
+  GstOptScheduler *osched;
+  
+  g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
+
+  osched = GST_OPT_SCHEDULER_CAST (object);
+
+  switch (prop_id) {
+    case ARG_USE_COTHREADS:
+      osched->use_cothreads = g_value_get_boolean (value);
+      break;
+    case ARG_ITERATIONS:
+      osched->iterations = g_value_get_int (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+