22a53d12fc8a1445f294baca511cf6a571455426
[platform/upstream/gstreamer.git] / gst / schedulers / gstoptimalscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #include <gst/gst.h>
28
29 GST_DEBUG_CATEGORY_STATIC(debug_scheduler);
30 #define GST_CAT_DEFAULT debug_scheduler
31
32 #ifdef USE_COTHREADS
33 # include "cothreads_compat.h"
34 #else
35 # define COTHREADS_NAME_CAPITAL ""
36 # define COTHREADS_NAME         ""
37 #endif
38
39 #define GST_ELEMENT_SCHED_CONTEXT(elem)         ((GstOptSchedulerCtx*) (GST_ELEMENT (elem)->sched_private))
40 #define GST_ELEMENT_SCHED_GROUP(elem)           (GST_ELEMENT_SCHED_CONTEXT (elem)->group)
41 #define GST_PAD_BUFLIST(pad)                    ((GList*) (GST_REAL_PAD(pad)->sched_private))
42
43 #define GST_ELEMENT_COTHREAD_STOPPING                   GST_ELEMENT_SCHEDULER_PRIVATE1
44 #define GST_ELEMENT_IS_COTHREAD_STOPPING(element)       GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
45 #define GST_ELEMENT_INTERRUPTED                         GST_ELEMENT_SCHEDULER_PRIVATE2
46 #define GST_ELEMENT_IS_INTERRUPTED(element)             GST_FLAG_IS_SET((element), GST_ELEMENT_INTERRUPTED)
47
48 typedef struct _GstOptScheduler GstOptScheduler;
49 typedef struct _GstOptSchedulerClass GstOptSchedulerClass;
50
51 #define GST_TYPE_OPT_SCHEDULER \
52   (gst_opt_scheduler_get_type())
53 #define GST_OPT_SCHEDULER(obj) \
54   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_OPT_SCHEDULER,GstOptScheduler))
55 #define GST_OPT_SCHEDULER_CLASS(klass) \
56   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_OPT_SCHEDULER,GstOptSchedulerClass))
57 #define GST_IS_OPT_SCHEDULER(obj) \
58   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OPT_SCHEDULER))
59 #define GST_IS_OPT_SCHEDULER_CLASS(obj) \
60   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OPT_SCHEDULER))
61
62 typedef enum {
63   GST_OPT_SCHEDULER_STATE_NONE,
64   GST_OPT_SCHEDULER_STATE_STOPPED,
65   GST_OPT_SCHEDULER_STATE_ERROR,
66   GST_OPT_SCHEDULER_STATE_RUNNING,
67   GST_OPT_SCHEDULER_STATE_INTERRUPTED
68 } GstOptSchedulerState;
69
70 struct _GstOptScheduler {
71   GstScheduler           parent;
72
73   GstOptSchedulerState   state;
74
75 #ifdef USE_COTHREADS
76   cothread_context      *context;
77 #endif
78   gint                   iterations;
79
80   GSList                *elements;
81   GSList                *chains;
82
83   GList                 *runqueue;
84   gint                   recursion;
85
86   gint                   max_recursion;
87 };
88
89 struct _GstOptSchedulerClass {
90   GstSchedulerClass parent_class;
91 };
92
93 static GType _gst_opt_scheduler_type = 0;
94
95 typedef enum {
96   GST_OPT_SCHEDULER_CHAIN_DIRTY                 = (1 << 1),     
97   GST_OPT_SCHEDULER_CHAIN_DISABLED              = (1 << 2),
98   GST_OPT_SCHEDULER_CHAIN_RUNNING               = (1 << 3),
99 } GstOptSchedulerChainFlags;
100
101 #define GST_OPT_SCHEDULER_CHAIN_DISABLE(chain)          ((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DISABLED)
102 #define GST_OPT_SCHEDULER_CHAIN_ENABLE(chain)           ((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DISABLED)
103 #define GST_OPT_SCHEDULER_CHAIN_IS_DISABLED(chain)      ((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DISABLED)
104
105 typedef struct _GstOptSchedulerChain GstOptSchedulerChain;
106
107 struct _GstOptSchedulerChain {
108   gint                           refcount;
109   
110   GstOptScheduler               *sched;
111
112   GstOptSchedulerChainFlags      flags;
113   
114   GSList                        *groups;                        /* the groups in this chain */
115   gint                           num_groups;
116   gint                           num_enabled;
117 };
118
119 /* 
120  * elements that are scheduled in one cothread 
121  */
122 typedef enum {
123   GST_OPT_SCHEDULER_GROUP_DIRTY                 = (1 << 1),     /* this group has been modified */
124   GST_OPT_SCHEDULER_GROUP_COTHREAD_STOPPING     = (1 << 2),     /* the group's cothread stops after one iteration */
125   GST_OPT_SCHEDULER_GROUP_DISABLED              = (1 << 3),     /* this group is disabled */
126   GST_OPT_SCHEDULER_GROUP_RUNNING               = (1 << 4),     /* this group is running */
127   GST_OPT_SCHEDULER_GROUP_SCHEDULABLE           = (1 << 5),     /* this group is schedulable */
128   GST_OPT_SCHEDULER_GROUP_VISITED               = (1 << 6),     /* this group is visited when finding links */
129 } GstOptSchedulerGroupFlags;
130
131 typedef enum {
132   GST_OPT_SCHEDULER_GROUP_GET                   = 1,
133   GST_OPT_SCHEDULER_GROUP_LOOP                  = 2,
134 } GstOptSchedulerGroupType;
135
136 #define GST_OPT_SCHEDULER_GROUP_SET_FLAG(group,flag)    ((group)->flags |= (flag))
137 #define GST_OPT_SCHEDULER_GROUP_UNSET_FLAG(group,flag)  ((group)->flags &= ~(flag))
138 #define GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET(group,flag) ((group)->flags & (flag))
139
140 #define GST_OPT_SCHEDULER_GROUP_DISABLE(group)          ((group)->flags |= GST_OPT_SCHEDULER_GROUP_DISABLED)
141 #define GST_OPT_SCHEDULER_GROUP_ENABLE(group)           ((group)->flags &= ~GST_OPT_SCHEDULER_GROUP_DISABLED)
142 #define GST_OPT_SCHEDULER_GROUP_IS_ENABLED(group)       (!((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED))
143 #define GST_OPT_SCHEDULER_GROUP_IS_DISABLED(group)      ((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED)
144
145
146 typedef struct _GstOptSchedulerGroup GstOptSchedulerGroup;
147 typedef struct _GstOptSchedulerGroupLink GstOptSchedulerGroupLink;
148
149 /* used to keep track of links with other groups */
150 struct _GstOptSchedulerGroupLink {
151   GstOptSchedulerGroup  *group1;        /* the group we are linked with */
152   GstOptSchedulerGroup  *group2;        /* the group we are linked with */
153   gint                   count;         /* the number of links with the group */
154 };
155
156 #define IS_GROUP_LINK(link, group1, group2)     ((link->group1 == group1 && link->group2 == group2) || \
157                                                  (link->group2 == group1 && link->group1 == group2))
158 #define OTHER_GROUP_LINK(link, group)           (link->group1 == group ? link->group2 : link->group1)
159
160 typedef int (*GroupScheduleFunction)    (int argc, char *argv[]);
161
162 struct _GstOptSchedulerGroup {
163   GstOptSchedulerChain          *chain;                 /* the chain this group belongs to */
164   GstOptSchedulerGroupFlags      flags;                 /* flags for this group */
165   GstOptSchedulerGroupType       type;                  /* flags for this group */
166
167   gint                           refcount;
168
169   GSList                        *elements;              /* elements of this group */
170   gint                           num_elements;
171   gint                           num_enabled;
172   GstElement                    *entry;                 /* the group's entry point */
173
174   GSList                        *group_links;           /* other groups that are linked with this group */
175
176 #ifdef USE_COTHREADS
177   cothread                      *cothread;              /* the cothread of this group */
178 #else
179   GroupScheduleFunction          schedulefunc;
180 #endif
181   int                            argc;
182   char                         **argv;
183 };
184
185
186 /* 
187  * A group is a set of elements through which data can flow without switching
188  * cothreads or without invoking the scheduler's run queue.
189  */
190 static GstOptSchedulerGroup*    ref_group                       (GstOptSchedulerGroup *group);
191 static GstOptSchedulerGroup*    unref_group                     (GstOptSchedulerGroup *group);
192 static GstOptSchedulerGroup*    create_group                    (GstOptSchedulerChain *chain,
193                                                                  GstElement *element,
194                                                                  GstOptSchedulerGroupType type);
195 static void                     destroy_group                   (GstOptSchedulerGroup *group);
196 static GstOptSchedulerGroup*    add_to_group                    (GstOptSchedulerGroup *group,
197                                                                  GstElement *element);
198 static GstOptSchedulerGroup*    remove_from_group               (GstOptSchedulerGroup *group,
199                                                                  GstElement *element);
200 static GstOptSchedulerGroup*    merge_groups                    (GstOptSchedulerGroup *group1,
201                                                                  GstOptSchedulerGroup *group2);
202 static void                     setup_group_scheduler           (GstOptScheduler *osched,
203                                                                  GstOptSchedulerGroup *group);
204 static void                     destroy_group_scheduler         (GstOptSchedulerGroup *group);
205 static void                     group_error_handler             (GstOptSchedulerGroup *group);
206 static void                     group_element_set_enabled       (GstOptSchedulerGroup *group, 
207                                                                  GstElement *element,
208                                                                  gboolean enabled);
209 static gboolean                 schedule_group                  (GstOptSchedulerGroup *group);
210
211
212 /* 
213  * A chain is a set of groups that are linked to each other.
214  */
215 static void                     destroy_chain                   (GstOptSchedulerChain *chain);
216 static GstOptSchedulerChain*    create_chain                    (GstOptScheduler *osched);
217 static GstOptSchedulerChain*    ref_chain                       (GstOptSchedulerChain *chain);
218 static GstOptSchedulerChain*    unref_chain                     (GstOptSchedulerChain *chain);
219 static GstOptSchedulerChain*    add_to_chain                    (GstOptSchedulerChain *chain,
220                                                                  GstOptSchedulerGroup *group);
221 static GstOptSchedulerChain*    remove_from_chain               (GstOptSchedulerChain *chain,
222                                                                  GstOptSchedulerGroup *group);
223 static GstOptSchedulerChain*    merge_chains                    (GstOptSchedulerChain *chain1,
224                                                                  GstOptSchedulerChain *chain2);
225 static void                     chain_recursively_migrate_group (GstOptSchedulerChain *chain,
226                                                                  GstOptSchedulerGroup *group);
227 static void                     chain_group_set_enabled         (GstOptSchedulerChain *chain, 
228                                                                  GstOptSchedulerGroup *group,
229                                                                  gboolean enabled);
230 static void                     schedule_chain                  (GstOptSchedulerChain *chain);
231
232
233 /*
234  * The schedule functions are the entry points for cothreads, or called directly
235  * by gst_opt_scheduler_schedule_run_queue
236  */
237 static int                      get_group_schedule_function     (int argc, char *argv[]);
238 static int                      loop_group_schedule_function    (int argc, char *argv[]);
239 static int                      unknown_group_schedule_function (int argc, char *argv[]);
240
241
242 /*
243  * These wrappers are set on the pads as the chain handler (what happens when
244  * gst_pad_push is called) or get handler (for gst_pad_pull).
245  */
246 static void                     gst_opt_scheduler_loop_wrapper  (GstPad *sinkpad, GstData *data);
247 static GstData*                 gst_opt_scheduler_get_wrapper   (GstPad *srcpad);
248 static void                     gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstData *data);
249
250
251 /*
252  * Without cothreads, gst_pad_push or gst_pad_pull on a loop-based group will
253  * just queue the peer element on a list. We need to actually run the queue
254  * instead of relying on cothreads to do the switch for us.
255  */
256 #ifndef USE_COTHREADS
257 static void                     gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched);
258 #endif
259
260
261 /* 
262  * Scheduler private data for an element 
263  */
264 typedef struct _GstOptSchedulerCtx GstOptSchedulerCtx;
265
266 typedef enum {
267   GST_OPT_SCHEDULER_CTX_DISABLED                = (1 << 1),     /* the element is disabled */
268 } GstOptSchedulerCtxFlags;
269
270 struct _GstOptSchedulerCtx {
271   GstOptSchedulerGroup *group;                          /* the group this element belongs to */
272
273   GstOptSchedulerCtxFlags flags;                        /* flags for this element */
274 };
275
276  
277 /*
278  * Implementation of GstScheduler
279  */
280 enum
281 {
282   ARG_0,
283   ARG_ITERATIONS,
284   ARG_MAX_RECURSION,
285 };
286
287 static void             gst_opt_scheduler_class_init            (GstOptSchedulerClass *klass);
288 static void             gst_opt_scheduler_init                  (GstOptScheduler *scheduler);
289
290 static void             gst_opt_scheduler_set_property          (GObject *object, guint prop_id,
291                                                                  const GValue *value, GParamSpec *pspec);
292 static void             gst_opt_scheduler_get_property          (GObject *object, guint prop_id,
293                                                                  GValue *value, GParamSpec *pspec);
294
295 static void             gst_opt_scheduler_dispose               (GObject *object);
296
297 static void             gst_opt_scheduler_setup                 (GstScheduler *sched);
298 static void             gst_opt_scheduler_reset                 (GstScheduler *sched);
299 static void             gst_opt_scheduler_add_element           (GstScheduler *sched, GstElement *element);
300 static void             gst_opt_scheduler_remove_element        (GstScheduler *sched, GstElement *element);
301 static GstElementStateReturn  
302                         gst_opt_scheduler_state_transition      (GstScheduler *sched, GstElement *element, gint transition);
303 static void             gst_opt_scheduler_scheduling_change     (GstScheduler *sched, GstElement *element);
304 static void             gst_opt_scheduler_lock_element          (GstScheduler *sched, GstElement *element);
305 static void             gst_opt_scheduler_unlock_element        (GstScheduler *sched, GstElement *element);
306 static gboolean         gst_opt_scheduler_yield                 (GstScheduler *sched, GstElement *element);
307 static gboolean         gst_opt_scheduler_interrupt             (GstScheduler *sched, GstElement *element);
308 static void             gst_opt_scheduler_error                 (GstScheduler *sched, GstElement *element);
309 static void             gst_opt_scheduler_pad_link              (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
310 static void             gst_opt_scheduler_pad_unlink            (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
311 static void             gst_opt_scheduler_pad_select            (GstScheduler *sched, GList *padlist);
312 static GstSchedulerState
313                         gst_opt_scheduler_iterate               (GstScheduler *sched);
314
315 static void             gst_opt_scheduler_show                  (GstScheduler *sched);
316
317 static GstSchedulerClass *parent_class = NULL;
318
319 static GType
320 gst_opt_scheduler_get_type (void)
321 {
322   if (!_gst_opt_scheduler_type) {
323     static const GTypeInfo scheduler_info = {
324       sizeof (GstOptSchedulerClass),
325       NULL,
326       NULL,
327       (GClassInitFunc) gst_opt_scheduler_class_init,
328       NULL,
329       NULL,
330       sizeof (GstOptScheduler),
331       0,
332       (GInstanceInitFunc) gst_opt_scheduler_init,
333       NULL
334     };
335
336     _gst_opt_scheduler_type = g_type_register_static (GST_TYPE_SCHEDULER, 
337                     "GstOpt"COTHREADS_NAME_CAPITAL"Scheduler", &scheduler_info, 0);
338   }
339   return _gst_opt_scheduler_type;
340 }
341
342 static void
343 gst_opt_scheduler_class_init (GstOptSchedulerClass *klass)
344 {
345   GObjectClass *gobject_class;
346   GstObjectClass *gstobject_class;
347   GstSchedulerClass *gstscheduler_class;
348
349   gobject_class = (GObjectClass*)klass;
350   gstobject_class = (GstObjectClass*)klass;
351   gstscheduler_class = (GstSchedulerClass*)klass;
352
353   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
354
355   gobject_class->set_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_set_property);
356   gobject_class->get_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property);
357   gobject_class->dispose        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose);
358
359   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ITERATIONS,
360     g_param_spec_int ("iterations", "Iterations", 
361                       "Number of groups to schedule in one iteration (-1 == until EOS/error)",
362                       -1, G_MAXINT, 1, G_PARAM_READWRITE));
363 #ifndef USE_COTHREADS
364   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_RECURSION,
365     g_param_spec_int ("max_recursion", "Max recursion", 
366                       "Maximum number of recursions",
367                       1, G_MAXINT, 100, G_PARAM_READWRITE));
368 #endif
369
370   gstscheduler_class->setup             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_setup);
371   gstscheduler_class->reset             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_reset);
372   gstscheduler_class->add_element       = GST_DEBUG_FUNCPTR (gst_opt_scheduler_add_element);
373   gstscheduler_class->remove_element    = GST_DEBUG_FUNCPTR (gst_opt_scheduler_remove_element);
374   gstscheduler_class->state_transition  = GST_DEBUG_FUNCPTR (gst_opt_scheduler_state_transition);
375   gstscheduler_class->scheduling_change = GST_DEBUG_FUNCPTR (gst_opt_scheduler_scheduling_change);
376   gstscheduler_class->lock_element      = GST_DEBUG_FUNCPTR (gst_opt_scheduler_lock_element);
377   gstscheduler_class->unlock_element    = GST_DEBUG_FUNCPTR (gst_opt_scheduler_unlock_element);
378   gstscheduler_class->yield             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_yield);
379   gstscheduler_class->interrupt         = GST_DEBUG_FUNCPTR (gst_opt_scheduler_interrupt);
380   gstscheduler_class->error             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_error);
381   gstscheduler_class->pad_link          = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_link);
382   gstscheduler_class->pad_unlink        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_unlink);
383   gstscheduler_class->pad_select        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_select);
384   gstscheduler_class->clock_wait        = NULL;
385   gstscheduler_class->iterate           = GST_DEBUG_FUNCPTR (gst_opt_scheduler_iterate);
386   gstscheduler_class->show              = GST_DEBUG_FUNCPTR (gst_opt_scheduler_show);
387   
388 #ifdef USE_COTHREADS
389   do_cothreads_init(NULL);
390 #endif
391 }
392
393 static void
394 gst_opt_scheduler_init (GstOptScheduler *scheduler)
395 {
396   scheduler->elements = NULL;
397   scheduler->iterations = 1;
398   scheduler->max_recursion = 100;
399 }
400
401 static void
402 gst_opt_scheduler_dispose (GObject *object)
403 {
404   G_OBJECT_CLASS (parent_class)->dispose (object);
405 }
406
407 static gboolean
408 plugin_init (GstPlugin *plugin)
409 {
410   GstSchedulerFactory *factory;
411
412   GST_DEBUG_CATEGORY_INIT (debug_scheduler, "scheduler", 0, "optimal scheduler");
413
414 #ifdef USE_COTHREADS
415   factory = gst_scheduler_factory_new ("opt"COTHREADS_NAME,
416                                        "An optimal scheduler using "COTHREADS_NAME" cothreads",
417                                       gst_opt_scheduler_get_type());
418 #else
419   factory = gst_scheduler_factory_new ("opt",
420                                        "An optimal scheduler using no cothreads",
421                                       gst_opt_scheduler_get_type());
422 #endif
423
424   if (factory != NULL) {
425     gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
426   }
427   else {
428     g_warning ("could not register scheduler: optimal");
429   }
430   return TRUE;
431 }
432
433 GST_PLUGIN_DEFINE (
434   GST_VERSION_MAJOR,
435   GST_VERSION_MINOR,
436   "gstopt"COTHREADS_NAME"scheduler",
437   "An optimal scheduler using "COTHREADS_NAME" cothreads",
438   plugin_init,
439   VERSION,
440   GST_LICENSE,
441   GST_PACKAGE,
442   GST_ORIGIN
443 );
444
445
446 static GstOptSchedulerChain*
447 ref_chain (GstOptSchedulerChain *chain)
448 {
449   GST_LOG ("ref chain %p %d->%d", chain, 
450            chain->refcount, chain->refcount+1);
451   chain->refcount++;
452
453   return chain;
454 }
455
456 static GstOptSchedulerChain*
457 unref_chain (GstOptSchedulerChain *chain)
458 {
459   GST_LOG ("unref chain %p %d->%d", chain, 
460            chain->refcount, chain->refcount-1);
461
462   if (--chain->refcount == 0) {
463     destroy_chain (chain);
464     chain = NULL;
465   }
466
467   return chain;
468 }
469
470 static GstOptSchedulerChain*
471 create_chain (GstOptScheduler *osched)
472 {
473   GstOptSchedulerChain *chain;
474
475   chain = g_new0 (GstOptSchedulerChain, 1);
476   chain->sched = osched;
477   chain->refcount = 1;
478   chain->flags = GST_OPT_SCHEDULER_CHAIN_DISABLED;
479
480   gst_object_ref (GST_OBJECT (osched));
481   osched->chains = g_slist_prepend (osched->chains, chain);
482
483   GST_LOG ( "new chain %p", chain);
484
485   return chain;
486 }
487
488 static void
489 destroy_chain (GstOptSchedulerChain *chain)
490 {
491   GstOptScheduler *osched;
492   
493   GST_LOG ( "destroy chain %p", chain);
494
495   g_assert (chain->num_groups == 0);
496   g_assert (chain->groups == NULL);
497
498   osched = chain->sched;
499   osched->chains = g_slist_remove (osched->chains, chain);
500
501   gst_object_unref (GST_OBJECT (osched));
502
503   g_free (chain);
504 }
505
506 static GstOptSchedulerChain*
507 add_to_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
508 {
509   GST_LOG ("adding group %p to chain %p", group, chain);
510
511   g_assert (group->chain == NULL);
512
513   group = ref_group (group);
514
515   group->chain = ref_chain (chain);
516
517   /* The first non-disabled group in the chain's group list will be the entry
518      point for the chain. Because buffers can accumulate in loop elements' peer
519      bufpens, we preferentially schedule loop groups before get groups to avoid
520      unnecessary execution of get-based groups when the bufpens are already
521      full. */
522   if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
523     chain->groups = g_slist_prepend (chain->groups, group);
524   else
525     chain->groups = g_slist_append (chain->groups, group);
526     
527   chain->num_groups++;
528
529   if (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group)) {
530     chain_group_set_enabled (chain, group, TRUE);
531   }
532
533   return chain;
534 }
535
536 static GstOptSchedulerChain*
537 remove_from_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
538 {
539   GST_LOG ("removing group %p from chain %p", group, chain);
540
541   if (!chain)
542     return NULL;
543
544   g_assert (group);
545   g_assert (group->chain == chain);
546
547   group->chain = NULL;
548   chain->groups = g_slist_remove (chain->groups, group);
549   chain->num_groups--;
550   unref_group (group);
551
552   if (chain->num_groups == 0) 
553     chain = unref_chain (chain);
554
555   chain = unref_chain (chain);
556   return chain;
557 }
558
559 static GstOptSchedulerChain*
560 merge_chains (GstOptSchedulerChain *chain1, GstOptSchedulerChain *chain2)
561 {
562   GSList *walk;
563
564   g_assert (chain1 != NULL);
565   
566   GST_LOG ("merging chain %p and %p", chain1, chain2);
567   
568   /* FIXME: document how chain2 can be NULL */
569   if (chain1 == chain2 || chain2 == NULL)
570     return chain1;
571
572   /* switch if it's more efficient */
573   if (chain1->num_groups < chain2->num_groups) {
574     GstOptSchedulerChain *tmp = chain2;
575     chain2 = chain1;
576     chain1 = tmp;
577   }
578
579   walk = chain2->groups;
580   while (walk) {
581     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
582     walk = g_slist_next (walk);
583
584     GST_LOG ("reparenting group %p from chain %p to %p", 
585              group, chain2, chain1);
586
587     ref_group (group);
588     
589     remove_from_chain (chain2, group);
590     add_to_chain (chain1, group);
591
592     unref_group (group);
593   }
594
595   /* chain2 is now freed, if nothing else was referencing it before */
596
597   return chain1;
598 }
599
600 static void
601 chain_group_set_enabled (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group, gboolean enabled)
602 {
603   g_assert (group != NULL);
604   g_assert (chain != NULL);
605
606   GST_LOG ("request to %d group %p in chain %p, have %d groups enabled out of %d", 
607            enabled, group, chain, chain->num_enabled, chain->num_groups);
608
609   if (enabled)
610     GST_OPT_SCHEDULER_GROUP_ENABLE (group);
611   else 
612     GST_OPT_SCHEDULER_GROUP_DISABLE (group);
613
614   if (enabled) {
615     if (chain->num_enabled < chain->num_groups)
616       chain->num_enabled++;
617
618     GST_DEBUG ("enable group %p in chain %p, now %d groups enabled out of %d", group, chain,
619                chain->num_enabled, chain->num_groups);
620
621     /* OK to call even if the scheduler (cothread context / schedulerfunc) was
622        setup already -- will get destroyed when the group is destroyed */
623     setup_group_scheduler (chain->sched, group);
624
625     if (chain->num_enabled == chain->num_groups) {
626       GST_DEBUG ("enable chain %p", chain);
627       GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
628     }
629   }
630   else {
631     if (chain->num_enabled > 0)
632       chain->num_enabled--;
633
634     GST_DEBUG ("disable group %p in chain %p, now %d groups enabled out of %d", group, chain,
635                chain->num_enabled, chain->num_groups);
636
637     if (chain->num_enabled == 0) {
638       GST_DEBUG ("disable chain %p", chain);
639       GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
640     }
641   }
642 }
643
644 /* recursively migrate the group and all connected groups into the new chain */
645 static void
646 chain_recursively_migrate_group (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
647 {
648   GSList *links;
649   
650   /* group already in chain */
651   if (group->chain == chain)
652     return;
653
654   /* first remove the group from its old chain */
655   remove_from_chain (group->chain, group);
656   /* add to new chain */
657   add_to_chain (chain, group);
658
659   /* then follow all links */
660   links = group->group_links;
661   while (links) {
662     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
663     links = g_slist_next (links);
664
665     chain_recursively_migrate_group (chain, (link->group1 == group ? link->group2 : link->group1));
666   }
667 }
668
669 static GstOptSchedulerGroup*
670 ref_group (GstOptSchedulerGroup *group)
671 {
672   GST_LOG ("ref group %p %d->%d", group, 
673            group->refcount, group->refcount+1);
674
675   group->refcount++;
676
677   return group;
678 }
679
680 static GstOptSchedulerGroup*
681 unref_group (GstOptSchedulerGroup *group)
682 {
683   GST_LOG ("unref group %p %d->%d", group, 
684            group->refcount, group->refcount-1);
685
686   if (--group->refcount == 0) {
687     destroy_group (group);
688     group = NULL;
689   }
690
691   return group;
692 }
693
694 static GstOptSchedulerGroup*
695 create_group (GstOptSchedulerChain *chain, GstElement *element,
696               GstOptSchedulerGroupType type)
697 {
698   GstOptSchedulerGroup *group;
699
700   group = g_new0 (GstOptSchedulerGroup, 1);
701   GST_LOG ("new group %p", group);
702   group->refcount = 1; /* float... */
703   group->flags = GST_OPT_SCHEDULER_GROUP_DISABLED;
704   group->type = type;
705
706   add_to_group (group, element);
707   add_to_chain (chain, group);
708   group = unref_group (group); /* ...and sink. */
709
710   /* group's refcount is now 2 (one for the element, one for the chain) */
711   
712   return group;
713 }
714
715 static void
716 destroy_group (GstOptSchedulerGroup *group)
717 {
718   GST_LOG ("destroy group %p", group);
719
720   g_assert (group != NULL);
721   g_assert (group->elements == NULL);
722   g_assert (group->chain == NULL);
723   g_assert (group->group_links == NULL);
724
725   if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
726     destroy_group_scheduler (group);
727
728   g_free (group);
729 }
730
731 static GstOptSchedulerGroup*
732 add_to_group (GstOptSchedulerGroup *group, GstElement *element)
733 {
734   g_assert (group != NULL);
735   g_assert (element != NULL);
736
737   GST_DEBUG ("adding element \"%s\" to group %p", GST_ELEMENT_NAME (element), group);
738
739   if (GST_ELEMENT_IS_DECOUPLED (element)) {
740     GST_DEBUG ("element \"%s\" is decoupled, not adding to group %p", 
741                GST_ELEMENT_NAME (element), group);
742     return group;
743   }
744
745   g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);
746
747   /* Ref the group... */
748   GST_ELEMENT_SCHED_GROUP (element) = ref_group (group);
749
750   gst_object_ref (GST_OBJECT (element));
751   group->elements = g_slist_prepend (group->elements, element);
752   group->num_elements++;
753
754   if (gst_element_get_state (element) == GST_STATE_PLAYING) {
755     group_element_set_enabled (group, element, TRUE);
756   }
757
758   return group;
759 }
760
761 /* if the element is linked to elements from other groups, you must decrement
762    the link count prior to calling this function */
763 static GstOptSchedulerGroup*
764 remove_from_group (GstOptSchedulerGroup *group, GstElement *element)
765 {
766   GST_DEBUG ("removing element \"%s\" from group %p", GST_ELEMENT_NAME (element), group);
767
768   g_assert (group != NULL);
769   g_assert (element != NULL);
770   g_assert (GST_ELEMENT_SCHED_GROUP (element) == group);
771
772   group->elements = g_slist_remove (group->elements, element);
773   group->num_elements--;
774
775   /* if the element was an entry point in the group, clear the group's
776    * entry point */
777   if (group->entry == element) {
778     group->entry = NULL;
779   }
780
781   GST_ELEMENT_SCHED_GROUP (element) = NULL;
782   gst_object_unref (GST_OBJECT (element));
783
784   if (group->num_elements == 0) {
785     GST_LOG ("group %p is now empty", group);
786     /* don't know in what case group->chain would be NULL, but putting this here
787        in deference to 0.8 -- remove me in 0.9 */
788     if (group->chain) {
789       GST_LOG ("removing group %p from its chain", group);
790       chain_group_set_enabled (group->chain, group, FALSE);
791       remove_from_chain (group->chain, group);
792     }
793   }
794   group = unref_group (group);
795
796   return group;
797 }
798
799 /* FIXME need to check if the groups are of the same type -- otherwise need to
800    setup the scheduler again, if it is setup */
801 static GstOptSchedulerGroup*
802 merge_groups (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
803 {
804   g_assert (group1 != NULL);
805
806   GST_DEBUG ("merging groups %p and %p", group1, group2);
807   
808   if (group1 == group2 || group2 == NULL)
809     return group1;
810
811   while (group2 && group2->elements) {
812     GstElement *element = (GstElement *)group2->elements->data;
813
814     group2 = remove_from_group (group2, element);
815     add_to_group (group1, element);
816   }
817   
818   return group1;
819 }
820
821 /* setup the scheduler context for a group. The right schedule function
822  * is selected based on the group type and cothreads are created if 
823  * needed */
824 static void 
825 setup_group_scheduler (GstOptScheduler *osched, GstOptSchedulerGroup *group) 
826 {
827   GroupScheduleFunction wrapper;
828
829   wrapper = unknown_group_schedule_function;
830
831   /* figure out the wrapper function for this group */
832   if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
833     wrapper = get_group_schedule_function;
834   else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
835     wrapper = loop_group_schedule_function;
836         
837 #ifdef USE_COTHREADS
838   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
839     do_cothread_create (group->cothread, osched->context,
840                       (cothread_func) wrapper, 0, (char **) group);
841   }
842   else {
843     do_cothread_setfunc (group->cothread, osched->context,
844                       (cothread_func) wrapper, 0, (char **) group);
845   }
846 #else
847   group->schedulefunc = wrapper;
848   group->argc = 0;
849   group->argv = (char **) group;
850 #endif
851   group->flags |= GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
852 }
853
854 static void 
855 destroy_group_scheduler (GstOptSchedulerGroup *group) 
856 {
857   g_assert (group);
858
859   if (group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)
860     g_warning ("destroying running group scheduler");
861
862 #ifdef USE_COTHREADS
863   if (group->cothread) {
864     do_cothread_destroy (group->cothread);
865     group->cothread = NULL;
866   }
867 #else
868   group->schedulefunc = NULL;
869   group->argc = 0;
870   group->argv = NULL;
871 #endif
872
873   group->flags &= ~GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
874 }
875
876 static void
877 group_error_handler (GstOptSchedulerGroup *group) 
878 {
879   GST_DEBUG ("group %p has errored", group);
880
881   chain_group_set_enabled (group->chain, group, FALSE);
882   group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
883 }
884
885 /* this function enables/disables an element, it will set/clear a flag on the element 
886  * and tells the chain that the group is enabled if all elements inside the group are
887  * enabled */
888 static void
889 group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gboolean enabled)
890 {
891   g_assert (group != NULL);
892   g_assert (element != NULL);
893
894   GST_LOG ("request to %d element %s in group %p, have %d elements enabled out of %d", 
895            enabled, GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
896
897   /* Note that if an unlinked PLAYING element is added to a bin, we have to
898      create a new group to hold the element, and this function will be called
899      before the group is added to the chain. Thus we have a valid case for
900      group->chain==NULL. */
901
902   if (enabled) {
903     if (group->num_enabled < group->num_elements)
904       group->num_enabled++;
905
906     GST_DEBUG ("enable element %s in group %p, now %d elements enabled out of %d", 
907                GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
908
909     if (group->num_enabled == group->num_elements) {
910       if (!group->chain) {
911         GST_DEBUG ("enable chainless group %p", group);
912         GST_OPT_SCHEDULER_GROUP_ENABLE (group);
913       } else {
914         GST_LOG ("enable group %p", group);
915         chain_group_set_enabled (group->chain, group, TRUE);
916       }
917     }
918   }
919   else {
920     if (group->num_enabled > 0)
921       group->num_enabled--;
922
923     GST_DEBUG ("disable element %s in group %p, now %d elements enabled out of %d", 
924                GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
925
926     if (group->num_enabled == 0) {
927       if (!group->chain) {
928         GST_DEBUG ("disable chainless group %p", group);
929         GST_OPT_SCHEDULER_GROUP_DISABLE (group);
930       } else {
931         GST_LOG ("disable group %p", group);
932         chain_group_set_enabled (group->chain, group, FALSE);
933       }
934     }
935   }
936 }
937
938 /* a group is scheduled by doing a cothread switch to it or
939  * by calling the schedule function. In the non-cothread case
940  * we cannot run already running groups so we return FALSE here
941  * to indicate this to the caller */
942 static gboolean 
943 schedule_group (GstOptSchedulerGroup *group) 
944 {
945   if (!group->entry) {
946     GST_INFO ("not scheduling group %p without entry", group);
947     return FALSE;
948   }
949
950 #ifdef USE_COTHREADS
951   if (group->cothread)
952     do_cothread_switch (group->cothread);
953   else
954     g_warning ("(internal error): trying to schedule group without cothread");
955   return TRUE;
956 #else
957   /* cothreads automatically call the pre- and post-run functions for us;
958    * without cothreads we need to call them manually */
959   if (group->schedulefunc == NULL) {
960     GST_INFO ("not scheduling group %p without schedulefunc", group);
961     return FALSE;
962   } else {
963     GSList *l;
964
965     for (l=group->elements; l; l=l->next) {
966       GstElement *e = (GstElement*)l->data;
967       if (e->pre_run_func)
968         e->pre_run_func (e);
969     }
970
971     group->schedulefunc (group->argc, group->argv);
972
973     for (l=group->elements; l; l=l->next) {
974       GstElement *e = (GstElement*)l->data;
975       if (e->post_run_func)
976         e->post_run_func (e);
977     }
978
979   }
980   return TRUE;
981 #endif
982 }
983
984 #ifndef USE_COTHREADS
985 static void
986 gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
987 {
988   GST_LOG_OBJECT (osched, "running queue: %d groups, recursed %d times",
989                   g_list_length (osched->runqueue),
990                   osched->recursion, g_list_length (osched->runqueue));
991
992   /* note that we have a ref on each group on the queue (unref after running) */
993
994   /* make sure we don't exceed max_recursion */
995   if (osched->recursion > osched->max_recursion) {
996     osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
997     return;
998   }
999
1000   osched->recursion++;
1001
1002   while (osched->runqueue) {
1003     GstOptSchedulerGroup *group;
1004     gboolean res;
1005     
1006     group = (GstOptSchedulerGroup *) osched->runqueue->data;
1007
1008     /* runqueue holds refcount to group */
1009     osched->runqueue = g_list_remove (osched->runqueue, group);
1010
1011     GST_LOG_OBJECT (osched, "scheduling group %p", group);
1012
1013     res = schedule_group (group);
1014     if (!res) {
1015       g_warning  ("error scheduling group %p", group);
1016       group_error_handler (group);
1017     }
1018     else {
1019       GST_LOG_OBJECT (osched, "done scheduling group %p", group);
1020     }
1021     unref_group (group);
1022   }
1023
1024   GST_LOG_OBJECT (osched, "run queue length after scheduling %d", g_list_length (osched->runqueue));
1025
1026   osched->recursion--;
1027 }
1028 #endif
1029
1030 /* a chain is scheduled by picking the first active group and scheduling it */
1031 static void
1032 schedule_chain (GstOptSchedulerChain *chain) 
1033 {
1034   GSList *groups;
1035   GstOptScheduler *osched;
1036
1037   osched = chain->sched;
1038   groups = chain->groups;
1039
1040   while (groups) {
1041     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1042
1043     if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
1044       ref_group (group);
1045       GST_LOG ("scheduling group %p in chain %p", 
1046                group, chain);
1047
1048 #ifdef USE_COTHREADS
1049       schedule_group (group);
1050 #else
1051       osched->recursion = 0;
1052       if (!g_list_find (osched->runqueue, group))
1053       {
1054         ref_group (group);
1055         osched->runqueue = g_list_append (osched->runqueue, group);
1056       }
1057       gst_opt_scheduler_schedule_run_queue (osched);
1058 #endif
1059
1060       GST_LOG ("done scheduling group %p in chain %p", 
1061                group, chain);
1062       unref_group (group);
1063       break;
1064     }
1065
1066     groups = g_slist_next (groups);
1067   }
1068 }
1069
1070 /* a get-based group is scheduled by getting a buffer from the get based
1071  * entry point and by pushing the buffer to the peer.
1072  * We also set the running flag on this group for as long as this
1073  * function is running. */
1074 static int
1075 get_group_schedule_function (int argc, char *argv[])
1076 {
1077   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1078   GstElement *entry = group->entry;
1079   const GList *pads = gst_element_get_pad_list (entry);
1080
1081   GST_LOG ("executing get-based group %p", group);
1082
1083   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
1084
1085   while (pads) {
1086     GstData *data;
1087     GstPad *pad = GST_PAD (pads->data);
1088     pads = g_list_next (pads);
1089
1090     /* skip sinks and ghostpads */
1091     if (!GST_PAD_IS_SRC (pad) || !GST_IS_REAL_PAD (pad))
1092       continue;
1093
1094     GST_DEBUG ("doing get and push on pad \"%s:%s\" in group %p", 
1095                GST_DEBUG_PAD_NAME (pad), group);
1096
1097     data = GST_RPAD_GETFUNC (pad) (pad);
1098     if (data) {
1099       if (GST_EVENT_IS_INTERRUPT (data)) {
1100         gst_event_unref (GST_EVENT (data));
1101         break;
1102       }
1103       gst_pad_push (pad, data);
1104     }
1105   }
1106
1107   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
1108
1109   return 0;
1110 }
1111
1112 /* a loop-based group is scheduled by calling the loop function
1113  * on the entry point. 
1114  * We also set the running flag on this group for as long as this
1115  * function is running. */
1116 static int
1117 loop_group_schedule_function (int argc, char *argv[])
1118 {
1119   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1120   GstElement *entry = group->entry;
1121
1122   GST_LOG ("executing loop-based group %p", group);
1123
1124   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
1125
1126   GST_DEBUG ("calling loopfunc of element %s in group %p", 
1127              GST_ELEMENT_NAME (entry), group);
1128
1129   entry->loopfunc (entry);
1130
1131   GST_LOG ("loopfunc ended of element %s in group %p", 
1132            GST_ELEMENT_NAME (entry), group);
1133
1134   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
1135
1136   return 0;
1137
1138 }
1139
1140 /* the function to schedule an unknown group, which just gives an error */
1141 static int
1142 unknown_group_schedule_function (int argc, char *argv[])
1143 {
1144   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1145
1146   g_warning ("(internal error) unknown group type %d, disabling\n", group->type);
1147   group_error_handler (group);
1148
1149   return 0;
1150 }
1151
1152 /* this function is called when the first element of a chain-loop or a loop-loop
1153  * link performs a push to the loop element. We then schedule the
1154  * group with the loop-based element until the bufpen is empty */
1155 static void
1156 gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstData *data)
1157 {
1158   GstOptSchedulerGroup *group;
1159   GstOptScheduler *osched;
1160   GstRealPad *peer;
1161
1162   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
1163   osched = group->chain->sched;
1164   peer = GST_RPAD_PEER (sinkpad);
1165
1166   GST_LOG ("chain handler for loop-based pad %" GST_PTR_FORMAT, sinkpad);
1167
1168 #ifdef USE_COTHREADS
1169   if (GST_PAD_BUFLIST (peer)) {
1170     g_warning ("deadlock detected, disabling group %p", group);
1171     group_error_handler (group);
1172   }
1173   else {
1174     GST_LOG ("queueing data %p on %s:%s's bufpen", data,
1175              GST_DEBUG_PAD_NAME (peer));
1176     GST_PAD_BUFLIST (peer) = g_list_append (GST_PAD_BUFLIST (peer), data);
1177     schedule_group (group);
1178   }
1179 #else
1180   GST_LOG ("queueing data %p on %s:%s's bufpen", data,
1181            GST_DEBUG_PAD_NAME (peer));
1182   GST_PAD_BUFLIST (peer) = g_list_append (GST_PAD_BUFLIST (peer), data);
1183   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1184     GST_LOG ("adding group %p to runqueue", group);
1185     if (!g_list_find (osched->runqueue, group))
1186     {
1187       ref_group (group);
1188       osched->runqueue = g_list_append (osched->runqueue, group);
1189     }
1190   }
1191 #endif
1192   
1193   GST_LOG ("%d buffers left on %s:%s's bufpen after chain handler",
1194             g_list_length (GST_PAD_BUFLIST (peer)));
1195 }
1196
1197 /* this function is called by a loop based element that performs a
1198  * pull on a sinkpad. We schedule the peer group until the bufpen
1199  * is filled with the buffer so that this function  can return */
1200 static GstData*
1201 gst_opt_scheduler_get_wrapper (GstPad *srcpad)
1202 {
1203   GstData *data;
1204   GstOptSchedulerGroup *group;
1205   GstOptScheduler *osched;
1206   gboolean disabled;
1207     
1208   GST_LOG ("get handler for %" GST_PTR_FORMAT, srcpad);
1209
1210   /* first try to grab a queued buffer */
1211   if (GST_PAD_BUFLIST (srcpad)) {
1212     data = GST_PAD_BUFLIST (srcpad)->data;
1213     GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), data);
1214     
1215     GST_LOG ("returning popped queued data %p", data);
1216
1217     return data;
1218   }
1219
1220   /* else we need to schedule the peer element */
1221   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad));
1222   osched = group->chain->sched;
1223   data = NULL;
1224   disabled = FALSE;
1225
1226   do {
1227     GST_LOG ("scheduling upstream group %p to fill bufpen", group);
1228 #ifdef USE_COTHREADS
1229     schedule_group (group);
1230 #else
1231     if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1232       ref_group (group);
1233
1234       if (!g_list_find (osched->runqueue, group))
1235       {
1236         ref_group (group);
1237         osched->runqueue = g_list_append (osched->runqueue, group);
1238       }
1239
1240       GST_LOG ("recursing into scheduler group %p", group);
1241       gst_opt_scheduler_schedule_run_queue (osched);
1242       GST_LOG ("return from recurse group %p", group);
1243
1244       /* if the other group was disabled we might have to break out of the loop */
1245       disabled = GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group);
1246       group = unref_group (group);
1247       /* group is gone */
1248       if (group == NULL) {
1249         /* if the group was gone we also might have to break out of the loop */
1250         disabled = TRUE;
1251       }
1252     }
1253     else {
1254       /* in this case, the group was running and we wanted to swtich to it,
1255        * this is not allowed in the optimal scheduler (yet) */
1256       g_warning ("deadlock detected, disabling group %p", group);
1257       group_error_handler (group);
1258       return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1259     }
1260 #endif
1261     /* if the scheduler interrupted, make sure we send an INTERRUPTED event to the
1262      * loop based element */
1263     if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
1264       GST_INFO ("scheduler interrupted, return interrupt event");
1265       data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1266     }
1267     else {
1268       if (GST_PAD_BUFLIST (srcpad)) {
1269         data = GST_PAD_BUFLIST (srcpad)->data;
1270         GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), data);
1271       }
1272       else if (disabled) {
1273         /* no buffer in queue and peer group was disabled */
1274         data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1275       }
1276     }
1277   }
1278   while (data == NULL);
1279
1280   GST_LOG ("get handler, returning data %p, queue length %d",
1281            data, g_list_length (GST_PAD_BUFLIST (srcpad)));
1282
1283   return data;
1284 }
1285
1286 /* this function is a chain wrapper for non-event-aware plugins,
1287  * it'll simply dispatch the events to the (default) event handler */
1288 static void
1289 gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstData *data)
1290 {
1291   if (GST_IS_EVENT (data)) {
1292     gst_pad_send_event (sinkpad, GST_EVENT (data));
1293   }
1294   else {
1295     GST_RPAD_CHAINFUNC (sinkpad) (sinkpad, data);
1296   }
1297 }
1298
1299 static void
1300 clear_queued (GstData *data, gpointer user_data)
1301 {
1302   gst_data_unref (data);
1303 }
1304
1305 static void
1306 pad_clear_queued (GstPad *srcpad, gpointer user_data)
1307 {
1308   GList *buflist = GST_PAD_BUFLIST (srcpad);
1309
1310   if (buflist) {
1311     GST_LOG ("need to clear some buffers");
1312     g_list_foreach (buflist, (GFunc) clear_queued, NULL);
1313     g_list_free (buflist);
1314     GST_PAD_BUFLIST (srcpad) = NULL;
1315   }
1316 }
1317
1318 static gboolean
1319 gst_opt_scheduler_event_wrapper (GstPad *srcpad, GstEvent *event)
1320 {
1321   gboolean flush;
1322
1323   GST_DEBUG ("intercepting event %d on pad %s:%s", 
1324              GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (srcpad));
1325   
1326   /* figure out if this is a flush event */
1327   switch (GST_EVENT_TYPE (event)) {
1328     case GST_EVENT_FLUSH:
1329       flush = TRUE;
1330       break;
1331     case GST_EVENT_SEEK:
1332     case GST_EVENT_SEEK_SEGMENT:
1333       flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
1334       break;
1335     default:
1336       flush = FALSE;
1337       break;
1338   }
1339
1340   if (flush) {
1341     GST_LOG ("event is flush");
1342
1343     pad_clear_queued (srcpad, NULL);
1344   }
1345   return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
1346 }
1347
1348 static GstElementStateReturn
1349 gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
1350 {
1351   GstOptSchedulerGroup *group;
1352   GstElementStateReturn res = GST_STATE_SUCCESS;
1353   
1354   GST_DEBUG ("element \"%s\" state change %d", GST_ELEMENT_NAME (element), transition);
1355
1356   /* we check the state of the managing pipeline here */
1357   if (GST_IS_BIN (element)) {
1358     if (GST_SCHEDULER_PARENT (sched) == element) {
1359       GST_LOG ("parent \"%s\" changed state", GST_ELEMENT_NAME (element));
1360
1361       switch (transition) {
1362         case GST_STATE_PLAYING_TO_PAUSED:
1363           GST_INFO ("setting scheduler state to stopped");
1364           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED;
1365           break;
1366         case GST_STATE_PAUSED_TO_PLAYING:
1367           GST_INFO ("setting scheduler state to running");
1368           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING;
1369           break;
1370         default:
1371           GST_LOG ("no interesting state change, doing nothing");
1372       }
1373     }
1374     return res;
1375   }
1376
1377   /* we don't care about decoupled elements after this */
1378   if (GST_ELEMENT_IS_DECOUPLED (element))
1379     return GST_STATE_SUCCESS;
1380
1381   /* get the group of the element */
1382   group = GST_ELEMENT_SCHED_GROUP (element);
1383
1384   switch (transition) {
1385     case GST_STATE_PAUSED_TO_PLAYING:
1386       /* an element withut a group has to be an unlinked src, sink
1387        * filter element */
1388       if (!group) {
1389         GST_INFO ("element \"%s\" has no group", GST_ELEMENT_NAME (element));
1390         res = GST_STATE_FAILURE;
1391       }
1392       /* else construct the scheduling context of this group and enable it */
1393       else {
1394         group_element_set_enabled (group, element, TRUE);
1395       }
1396       break;
1397     case GST_STATE_PLAYING_TO_PAUSED:
1398       /* if the element still has a group, we disable it */
1399       if (group) 
1400         group_element_set_enabled (group, element, FALSE);
1401       break;
1402     case GST_STATE_PAUSED_TO_READY:
1403     {  
1404       GList *pads = (GList *) gst_element_get_pad_list (element);
1405
1406       g_list_foreach (pads, (GFunc) pad_clear_queued, NULL);
1407       break;
1408     }
1409     default:
1410       break;
1411   }
1412
1413   return res;
1414 }
1415
1416 static void
1417 gst_opt_scheduler_scheduling_change (GstScheduler *sched, GstElement *element)
1418 {
1419   g_warning ("scheduling change, implement me");
1420 }
1421
1422 static void
1423 get_group (GstElement *element, GstOptSchedulerGroup **group)
1424 {
1425   GstOptSchedulerCtx *ctx;
1426   /*GList *pads;*/
1427
1428   ctx = GST_ELEMENT_SCHED_CONTEXT (element);
1429   if (ctx) 
1430     *group = ctx->group;
1431   else
1432     *group = NULL;
1433 }
1434
1435 /*
1436  * the idea is to put the two elements into the same group. 
1437  * - When no element is inside a group, we create a new group and add 
1438  *   the elements to it. 
1439  * - When one of the elements has a group, add the other element to 
1440  *   that group
1441  * - if both of the elements have a group, we merge the groups, which
1442  *   will also merge the chains.
1443  */
1444 static GstOptSchedulerGroup*
1445 group_elements (GstOptScheduler *osched, GstElement *element1, GstElement *element2,
1446                 GstOptSchedulerGroupType type)
1447 {
1448   GstOptSchedulerGroup *group1, *group2, *group = NULL;
1449   
1450   get_group (element1, &group1);
1451   get_group (element2, &group2);
1452   
1453   /* none of the elements is added to a group, create a new group
1454    * and chain to add the elements to */
1455   if (!group1 && !group2) {
1456     GstOptSchedulerChain *chain;
1457
1458     GST_DEBUG ("creating new group to hold \"%s\" and \"%s\"", 
1459               GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1460
1461     chain = create_chain (osched);
1462     group = create_group (chain, element1, type);
1463     add_to_group (group, element2);
1464   }
1465   /* the first element has a group */
1466   else if (group1) {
1467     GST_DEBUG ("adding \"%s\" to \"%s\"'s group", 
1468                GST_ELEMENT_NAME (element2), GST_ELEMENT_NAME (element1));
1469
1470     /* the second element also has a group, merge */
1471     if (group2)
1472       merge_groups (group1, group2);
1473     /* the second element has no group, add it to the group
1474      * of the first element */
1475     else
1476       add_to_group (group1, element2);
1477
1478     group = group1;
1479   }
1480   /* element1 has no group, element2 does. Add element1 to the
1481    * group of element2 */
1482   else {
1483     GST_DEBUG ("adding \"%s\" to \"%s\"'s group", 
1484                GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1485     add_to_group (group2, element1);
1486     group = group2;
1487   }
1488   return group;
1489 }
1490
1491 /*
1492  * increment link counts between groups
1493  */
1494 static void
1495 group_inc_link (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
1496 {
1497   GSList *links = group1->group_links;
1498   gboolean done = FALSE;
1499   GstOptSchedulerGroupLink *link;
1500
1501   /* first try to find a previous link */
1502   while (links && !done) {
1503     link = (GstOptSchedulerGroupLink *) links->data;
1504     links = g_slist_next (links);
1505     
1506     if (IS_GROUP_LINK (link, group1, group2)) {
1507       /* we found a link to this group, increment the link count */
1508       link->count++;
1509       GST_LOG ("incremented group link count between %p and %p to %d", 
1510                group1, group2, link->count);
1511       done = TRUE;
1512     }
1513   }
1514   if (!done) {
1515     /* no link was found, create a new one */
1516     link = g_new0 (GstOptSchedulerGroupLink, 1);
1517
1518     link->group1 = group1;
1519     link->group2 = group2;
1520     link->count = 1;
1521
1522     group1->group_links = g_slist_prepend (group1->group_links, link);
1523     group2->group_links = g_slist_prepend (group2->group_links, link);
1524
1525     GST_DEBUG ("added group link between %p and %p", 
1526                group1, group2);
1527   }
1528 }
1529
1530 /*
1531  * decrement link counts between groups, returns TRUE if the link count reaches 0
1532  */
1533 static gboolean
1534 group_dec_link (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
1535 {
1536   GSList *links = group1->group_links;
1537   gboolean res = FALSE;
1538   GstOptSchedulerGroupLink *link;
1539
1540   while (links) {
1541     link = (GstOptSchedulerGroupLink *) links->data;
1542     links = g_slist_next (links);
1543     
1544     if (IS_GROUP_LINK (link, group1, group2)) {
1545       link->count--;
1546       GST_LOG ("link count between %p and %p is now %d", 
1547                group1, group2, link->count);
1548       if (link->count == 0) {
1549         group1->group_links = g_slist_remove (group1->group_links, link);
1550         group2->group_links = g_slist_remove (group2->group_links, link);
1551         g_free (link);
1552         GST_DEBUG ("removed group link between %p and %p", 
1553                    group1, group2);
1554         res = TRUE;
1555       }
1556       break;
1557     }
1558   }
1559   return res;
1560 }
1561
1562
1563 typedef enum {
1564   GST_OPT_INVALID,
1565   GST_OPT_GET_TO_CHAIN,
1566   GST_OPT_LOOP_TO_CHAIN,
1567   GST_OPT_GET_TO_LOOP,
1568   GST_OPT_CHAIN_TO_CHAIN,
1569   GST_OPT_CHAIN_TO_LOOP,
1570   GST_OPT_LOOP_TO_LOOP,
1571 } LinkType;
1572
1573 /*
1574  * Entry points for this scheduler.
1575  */
1576 static void
1577 gst_opt_scheduler_setup (GstScheduler *sched)
1578 {   
1579 #ifdef USE_COTHREADS
1580   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1581               
1582   /* first create thread context */
1583   if (osched->context == NULL) {
1584     GST_DEBUG ("initializing cothread context");
1585     osched->context = do_cothread_context_init ();
1586   }
1587 #endif
1588
1589   
1590 static void 
1591 gst_opt_scheduler_reset (GstScheduler *sched)
1592
1593 #ifdef USE_COTHREADS
1594   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1595   GSList *chains = osched->chains;
1596
1597   while (chains) {
1598     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
1599     GSList *groups = chain->groups;
1600
1601     while (groups) {
1602       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1603
1604       destroy_group_scheduler (group);
1605       groups = groups->next;
1606     }
1607     chains = chains->next;
1608   }
1609               
1610   if (osched->context) {
1611     do_cothread_context_destroy (osched->context);
1612     osched->context = NULL; 
1613   }
1614 #endif
1615 }     
1616 static void
1617 gst_opt_scheduler_add_element (GstScheduler *sched, GstElement *element)
1618 {
1619   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1620   GstOptSchedulerCtx *ctx;
1621   const GList *pads; 
1622
1623   GST_DEBUG_OBJECT (sched, "adding element \"%s\"", GST_OBJECT_NAME (element));
1624
1625   /* decoupled elements are not added to the scheduler lists */
1626   if (GST_ELEMENT_IS_DECOUPLED (element))
1627     return;
1628
1629   ctx = g_new0 (GstOptSchedulerCtx, 1);
1630   GST_ELEMENT_SCHED_CONTEXT (element) = ctx;
1631   ctx->flags = GST_OPT_SCHEDULER_CTX_DISABLED;
1632
1633   /* set event handler on all pads here so events work unconnected too;
1634    * in _link, it can be overruled if need be */
1635   /* FIXME: we should also do this when new pads on the element are created;
1636      but there are no hooks, so we do it again in _link */
1637   pads = gst_element_get_pad_list (element);
1638   while (pads) {
1639     GstPad *pad = GST_PAD (pads->data);
1640     pads = g_list_next (pads);
1641
1642     if (!GST_IS_REAL_PAD (pad)) continue;
1643     GST_RPAD_EVENTHANDLER (pad) = GST_RPAD_EVENTFUNC (pad);
1644   }
1645
1646   /* loop based elements *always* end up in their own group. It can eventually
1647    * be merged with another group when a link is made */
1648   if (element->loopfunc) {
1649     GstOptSchedulerGroup *group;
1650     GstOptSchedulerChain *chain;
1651
1652     chain = create_chain (osched);
1653
1654     group = create_group (chain, element, GST_OPT_SCHEDULER_GROUP_LOOP);
1655     group->entry = element;
1656
1657     GST_LOG ("added element \"%s\" as loop based entry", GST_ELEMENT_NAME (element));
1658   }
1659 }
1660
1661 static void
1662 gst_opt_scheduler_remove_element (GstScheduler *sched, GstElement *element)
1663 {
1664   GstOptSchedulerGroup *group;
1665
1666   GST_DEBUG_OBJECT (sched, "removing element \"%s\"", GST_OBJECT_NAME (element));
1667
1668   /* decoupled elements are not added to the scheduler lists and should therefore
1669    * no be removed */
1670   if (GST_ELEMENT_IS_DECOUPLED (element))
1671     return;
1672
1673   /* the element is guaranteed to live in it's own group/chain now */
1674   get_group (element, &group);
1675   if (group) {
1676     remove_from_group (group, element);
1677   }
1678
1679   g_free (GST_ELEMENT_SCHED_CONTEXT (element));
1680   GST_ELEMENT_SCHED_CONTEXT (element) = NULL;
1681 }
1682
1683 static void
1684 gst_opt_scheduler_lock_element (GstScheduler *sched, GstElement *element)
1685 {
1686   //GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1687   g_warning ("lock element, implement me");
1688 }
1689
1690 static void
1691 gst_opt_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
1692 {
1693   //GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1694   g_warning ("unlock element, implement me");
1695 }
1696
1697 static gboolean
1698 gst_opt_scheduler_yield (GstScheduler *sched, GstElement *element)
1699 {
1700 #ifdef USE_COTHREADS
1701   /* yield hands control to the main cothread context if the requesting 
1702    * element is the entry point of the group */
1703   GstOptSchedulerGroup *group;
1704   get_group (element, &group);
1705   if (group && group->entry == element)
1706     do_cothread_switch (do_cothread_get_main (((GstOptScheduler*)sched)->context)); 
1707
1708   return FALSE;
1709 #else
1710   g_warning ("element %s performs a yield, please fix the element", 
1711                   GST_ELEMENT_NAME (element));
1712   return TRUE;
1713 #endif
1714 }
1715
1716 static gboolean
1717 gst_opt_scheduler_interrupt (GstScheduler *sched, GstElement *element)
1718 {
1719   GST_INFO ("interrupt from \"%s\"", 
1720             GST_OBJECT_NAME (element));
1721
1722 #ifdef USE_COTHREADS
1723   do_cothread_switch (do_cothread_get_main (((GstOptScheduler*)sched)->context)); 
1724   return FALSE;
1725 #else
1726   {
1727     GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1728  
1729     GST_INFO ("scheduler set interrupted state");
1730     osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
1731   }
1732   return TRUE;
1733 #endif
1734 }
1735
1736 static void
1737 gst_opt_scheduler_error (GstScheduler *sched, GstElement *element)
1738 {
1739   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1740   GstOptSchedulerGroup *group;
1741   get_group (element, &group);
1742   if (group)
1743     group_error_handler (group);
1744
1745   osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1746 }
1747
1748 /* link pads, merge groups and chains */
1749 static void
1750 gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
1751 {
1752   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1753   LinkType type = GST_OPT_INVALID;
1754   GstElement *element1, *element2;
1755
1756   GST_INFO ("scheduling link between %s:%s and %s:%s", 
1757             GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1758
1759   element1 = GST_PAD_PARENT (srcpad);
1760   element2 = GST_PAD_PARENT (sinkpad);
1761
1762   /* first we need to figure out what type of link we're dealing
1763    * with */
1764   if (element1->loopfunc && element2->loopfunc)
1765     type = GST_OPT_LOOP_TO_LOOP;
1766   else {
1767     if (element1->loopfunc) {
1768       if (GST_RPAD_CHAINFUNC (sinkpad))
1769         type = GST_OPT_LOOP_TO_CHAIN;
1770     }
1771     else if (element2->loopfunc) {
1772       if (GST_RPAD_GETFUNC (srcpad)) {
1773         type = GST_OPT_GET_TO_LOOP;
1774         /* this could be tricky, the get based source could 
1775          * already be part of a loop based group in another pad,
1776          * we assert on that for now */
1777         if (GST_ELEMENT_SCHED_CONTEXT (element1) != NULL &&
1778             GST_ELEMENT_SCHED_GROUP (element1) != NULL) 
1779         {
1780           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (element1);
1781
1782           /* if the loop based element is the entry point we're ok, if it
1783            * isn't then we have multiple loop based elements in this group */
1784           if (group->entry != element2) {
1785             g_error ("internal error: cannot schedule get to loop in multi-loop based group");
1786             return;
1787           }
1788         }
1789       }
1790       else
1791         type = GST_OPT_CHAIN_TO_LOOP;
1792     }
1793     else {
1794       if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad)) {
1795         type = GST_OPT_GET_TO_CHAIN;
1796         /* the get based source could already be part of a loop 
1797          * based group in another pad, we assert on that for now */
1798         if (GST_ELEMENT_SCHED_CONTEXT (element1) != NULL &&
1799             GST_ELEMENT_SCHED_GROUP (element1) != NULL) 
1800         {
1801           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (element1);
1802
1803           /* if the get based element is the entry point we're ok, if it
1804            * isn't then we have a mixed loop/chain based group */
1805           if (group->entry != element1) {
1806             g_error ("internal error: cannot schedule get to chain with mixed loop/chain based group");
1807             return;
1808           }
1809         }
1810       }
1811       else 
1812         type = GST_OPT_CHAIN_TO_CHAIN;
1813     }
1814   }
1815  
1816   /* since we can't set event handlers on pad creation after addition, it is
1817    * best we set all of them again to the default before linking */
1818   GST_RPAD_EVENTHANDLER (srcpad) = GST_RPAD_EVENTFUNC (srcpad);
1819   GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad);
1820   
1821   /* for each link type, perform specific actions */
1822   switch (type) {
1823     case GST_OPT_GET_TO_CHAIN:
1824     {
1825       GstOptSchedulerGroup *group = NULL;
1826
1827       GST_LOG ("get to chain based link");
1828
1829       /* setup get/chain handlers */
1830       GST_RPAD_GETHANDLER (srcpad) = GST_RPAD_GETFUNC (srcpad);
1831       if (GST_ELEMENT_IS_EVENT_AWARE (element2))
1832         GST_RPAD_CHAINHANDLER (sinkpad) = GST_RPAD_CHAINFUNC (sinkpad);
1833       else
1834         GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
1835
1836       /* the two elements should be put into the same group, 
1837        * this also means that they are in the same chain automatically */
1838       group = group_elements (osched, element1, element2,
1839                               GST_OPT_SCHEDULER_GROUP_GET);
1840
1841       /* if there is not yet an entry in the group, select the source
1842        * element as the entry point */
1843       if (!group->entry) {
1844         group->entry = element1;
1845
1846         GST_DEBUG ("setting \"%s\" as entry point of _get-based group %p", 
1847                    GST_ELEMENT_NAME (element1), group);
1848       }
1849       break;
1850     }
1851     case GST_OPT_LOOP_TO_CHAIN:
1852     case GST_OPT_CHAIN_TO_CHAIN:
1853       GST_LOG ("loop/chain to chain based link");
1854
1855       if (GST_ELEMENT_IS_EVENT_AWARE (element2))
1856         GST_RPAD_CHAINHANDLER (sinkpad) = GST_RPAD_CHAINFUNC (sinkpad);
1857       else
1858         GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
1859
1860       /* the two elements should be put into the same group, 
1861        * this also means that they are in the same chain automatically, 
1862        * in case of a loop-based element1, there will be a group for element1 and
1863        * element2 will be added to it. */
1864       group_elements (osched, element1, element2, GST_OPT_SCHEDULER_GROUP_LOOP);
1865       break;
1866     case GST_OPT_GET_TO_LOOP:
1867       GST_LOG ("get to loop based link");
1868
1869       GST_RPAD_GETHANDLER (srcpad) = GST_RPAD_GETFUNC (srcpad);
1870
1871       /* the two elements should be put into the same group, 
1872        * this also means that they are in the same chain automatically, 
1873        * element2 is loop-based so it already has a group where element1
1874        * will be added to */
1875       group_elements (osched, element1, element2, GST_OPT_SCHEDULER_GROUP_LOOP);
1876       break;
1877     case GST_OPT_CHAIN_TO_LOOP:
1878     case GST_OPT_LOOP_TO_LOOP:
1879     {
1880       GstOptSchedulerGroup *group1, *group2;
1881
1882       GST_LOG ("chain/loop to loop based link");
1883
1884       GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper;
1885       GST_RPAD_GETHANDLER (srcpad) = gst_opt_scheduler_get_wrapper;
1886       /* events on the srcpad have to be intercepted as we might need to
1887        * flush the buffer lists, so override the given eventfunc */
1888       GST_RPAD_EVENTHANDLER (srcpad) = gst_opt_scheduler_event_wrapper;
1889
1890       group1 = GST_ELEMENT_SCHED_GROUP (element1);
1891       group2 = GST_ELEMENT_SCHED_GROUP (element2);
1892
1893       g_assert (group2 != NULL);
1894
1895       /* group2 is guaranteed to exist as it contains a loop-based element.
1896        * group1 only exists if element1 is linked to some other element */
1897       if (!group1) {
1898         /* create a new group for element1 as it cannot be merged into another group
1899          * here. we create the group in the same chain as the loop-based element. */
1900         GST_DEBUG ("creating new group for element %s", GST_ELEMENT_NAME (element1));
1901         group1 = create_group (group2->chain, element1,
1902                                GST_OPT_SCHEDULER_GROUP_LOOP);
1903       }
1904       else {
1905         /* both elements are already in a group, make sure they are added to
1906          * the same chain */
1907         merge_chains (group1->chain, group2->chain);
1908       }
1909       group_inc_link (group1, group2);
1910       break;
1911     }
1912     case GST_OPT_INVALID:
1913       g_error ("(internal error) invalid element link, what are you doing?");
1914       break;
1915   }
1916 }
1917
1918 /* 
1919  * checks if an element is still linked to some other element in the group. 
1920  * no checking is done on the brokenpad arg 
1921  */
1922 static gboolean
1923 element_has_link_with_group (GstElement *element, GstOptSchedulerGroup *group, GstPad *brokenpad)
1924 {
1925   gboolean linked = FALSE;
1926   const GList *pads;
1927
1928   /* see if the element has no more links to the peer group */
1929   pads = gst_element_get_pad_list (element);
1930   while (pads && !linked) {
1931     GstPad *pad = GST_PAD (pads->data);
1932     pads = g_list_next (pads);
1933
1934     /* we only operate on real pads and on the pad that is not broken */
1935     if (!GST_IS_REAL_PAD (pad) || pad == brokenpad)
1936       continue;
1937
1938     if (GST_PAD_PEER (pad)) {
1939       GstElement *parent;
1940       GstOptSchedulerGroup *parentgroup;
1941
1942       /* see in what group this element is */
1943       parent = GST_PAD_PARENT (GST_PAD_PEER (pad));
1944
1945       /* links with decoupled elements are valid */
1946       if (GST_ELEMENT_IS_DECOUPLED (parent)) {
1947         linked = TRUE;
1948       }
1949       else {
1950         /* for non-decoupled elements we need to check the group */
1951         get_group (parent, &parentgroup);
1952
1953         /* if it's in the same group, we're still linked */
1954         if (parentgroup == group)
1955           linked = TRUE;
1956       }
1957     } 
1958   }
1959   return linked;
1960 }
1961
1962 /* 
1963  * checks if a target group is still reachable from the group without taking the broken
1964  * group link into account.
1965  */
1966 static gboolean
1967 group_can_reach_group (GstOptSchedulerGroup *group, GstOptSchedulerGroup *target)
1968 {
1969   gboolean reachable = FALSE;
1970   const GSList *links = group->group_links;
1971
1972   GST_LOG ("checking if group %p can reach %p", 
1973            group, target);
1974
1975   /* seems like we found the target element */
1976   if (group == target) {
1977     GST_LOG ("found way to reach %p", target);
1978     return TRUE;
1979   }
1980
1981   /* if the group is marked as visited, we don't need to check here */
1982   if (GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET (group, GST_OPT_SCHEDULER_GROUP_VISITED)) {
1983     GST_LOG ("already visited %p", group);
1984     return FALSE;
1985   }
1986
1987   /* mark group as visited */
1988   GST_OPT_SCHEDULER_GROUP_SET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
1989
1990   while (links && !reachable) {
1991     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
1992     GstOptSchedulerGroup *other;
1993
1994     links = g_slist_next (links);
1995
1996     /* find other group in this link */
1997     other = OTHER_GROUP_LINK (link, group);
1998
1999     GST_LOG ("found link from %p to %p, count %d", 
2000              group, other, link->count);
2001
2002     /* check if we can reach the target recursiveley */
2003     reachable = group_can_reach_group (other, target);
2004   }
2005   /* unset the visited flag, note that this is not optimal as we might be checking
2006    * groups several times when they are reachable with a loop. An alternative would be
2007    * to not clear the group flag at this stage but clear all flags in the chain when
2008    * all groups are checked. */
2009   GST_OPT_SCHEDULER_GROUP_UNSET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
2010
2011   GST_LOG ("leaving group %p with %s", group, (reachable ? "TRUE":"FALSE"));
2012
2013   return reachable;
2014 }
2015
2016 static void
2017 group_dec_links_for_element (GstOptSchedulerGroup* group, GstElement *element)
2018 {
2019   GList *l;
2020   GstPad *pad;
2021   GstOptSchedulerGroup *peer_group;
2022   
2023   for (l=GST_ELEMENT_PADS(element); l; l=l->next) {
2024     pad = (GstPad*)l->data;
2025     if (GST_IS_REAL_PAD (pad) && GST_PAD_PEER (pad)) {
2026       get_group (GST_PAD_PARENT (GST_PAD_PEER (pad)), &peer_group);
2027       if (peer_group && peer_group != group)
2028         group_dec_link (group, peer_group);
2029     }
2030   }
2031 }
2032
2033 static void
2034 gst_opt_scheduler_pad_unlink (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
2035 {
2036   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2037   GstElement *element1, *element2;
2038   GstOptSchedulerGroup *group1, *group2;
2039
2040   GST_INFO ("unscheduling link between %s:%s and %s:%s", 
2041             GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
2042
2043   element1 = GST_PAD_PARENT (srcpad);
2044   element2 = GST_PAD_PARENT (sinkpad);
2045   
2046   get_group (element1, &group1);
2047   get_group (element2, &group2);
2048
2049   /* for decoupled elements (that are never put into a group) we use the
2050    * group of the peer element for the remainder of the algorithm */
2051   if (GST_ELEMENT_IS_DECOUPLED (element1)) {
2052     group1 = group2;
2053   }
2054   if (GST_ELEMENT_IS_DECOUPLED (element2)) {
2055     group2 = group1;
2056   }
2057
2058   /* if one the elements has no group (anymore) we don't really care 
2059    * about the link */
2060   if (!group1 || !group2) {
2061     GST_LOG ("one (or both) of the elements is not in a group, not interesting");
2062     return;
2063   }
2064
2065   /* easy part, groups are different */
2066   if (group1 != group2) {
2067     gboolean zero;
2068
2069     GST_LOG ("elements are in different groups");
2070
2071     /* we can remove the links between the groups now */
2072     zero = group_dec_link (group1, group2);
2073
2074     /* if the groups are not directly connected anymore, we have to perform a recursive check
2075      * to see if they are really unlinked */
2076     if (zero) {
2077       gboolean still_link;
2078       GstOptSchedulerChain *chain;
2079
2080       /* see if group1 and group2 are still connected in any indirect way */
2081       still_link = group_can_reach_group (group1, group2);
2082
2083       GST_DEBUG ("group %p %s reach group %p", group1, (still_link ? "can":"can't"), group2);
2084       if (!still_link) {
2085         /* groups are really disconnected, migrate one group to a new chain */
2086         chain = create_chain (osched);
2087         chain_recursively_migrate_group (chain, group1);
2088
2089         GST_DEBUG ("migrated group %p to new chain %p", group1, chain);
2090       }
2091     }
2092     else {
2093       GST_DEBUG ("group %p still has direct link with group %p", group1, group2);
2094     }
2095   }
2096   /* hard part, groups are equal */
2097   else {
2098     gboolean still_link1, still_link2;
2099     GstOptSchedulerGroup *group;
2100     GstElement *element = NULL; /* shut up gcc */
2101     
2102     /* since group1 == group2, it doesn't matter which group we take */
2103     group = group1;
2104
2105     GST_LOG ("elements are in the same group %p", group);
2106
2107     /* check if the element is still linked to some other element in the group,
2108      * we pass the pad that is broken up as an arg because a link on that pad
2109      * is not valid anymore.
2110      * Note that this check is only to make sure that a single element can be removed 
2111      * completely from the group, we also have to check for migrating several 
2112      * elements to a new group. */
2113     still_link1 = element_has_link_with_group (element1, group, srcpad);
2114     still_link2 = element_has_link_with_group (element2, group, sinkpad);
2115     /* if there is still a link, we don't need to break this group */
2116     if (still_link1 && still_link2) {
2117       GSList *l;
2118       GList *m;;
2119       int linkcount;
2120       
2121       GST_LOG ( "elements still have links with other elements in the group");
2122       
2123       while (group->elements)
2124         for (l = group->elements; l && l->data; l = l->next)  {
2125           element = (GstElement*)l->data;
2126           if (GST_ELEMENT_IS_DECOUPLED (element))
2127             continue;
2128             
2129           linkcount = 0;
2130           GST_LOG ("Examining %s\n", GST_ELEMENT_NAME (element));
2131           for (m = GST_ELEMENT_PADS (element); m; m = m->next) {
2132             GstPad *peer, *pad;
2133             GstElement *parent;
2134             GstOptSchedulerGroup *peer_group;
2135
2136             pad = (GstPad*)m->data;
2137             if (!pad || !GST_IS_REAL_PAD (pad))
2138               continue;
2139
2140             peer = GST_PAD_PEER (pad);
2141             if (!peer || !GST_IS_REAL_PAD (peer))
2142               continue;
2143                 
2144             parent = GST_PAD_PARENT (GST_PAD_PEER (pad));
2145             get_group (parent, &peer_group);
2146             if (peer_group && peer_group != group) {
2147               GST_LOG ("pad %s is linked with %s\n",
2148                        GST_PAD_NAME (pad),
2149                        GST_ELEMENT_NAME (parent));
2150               linkcount++;
2151             }
2152           }
2153
2154           if (linkcount < 2) {
2155             group_dec_links_for_element  (group, element);
2156             remove_from_group (group, element);
2157           }
2158           /* if linkcount == 2, it will be unlinked later on */
2159           else if (linkcount > 2) {
2160             g_warning ("opt: Can't handle element %s with 3 or more links, aborting",
2161                        GST_ELEMENT_NAME (element));
2162             return;
2163           }
2164         }           
2165       /* Peer element will be catched during next iteration */
2166       return;
2167     }
2168
2169     if (!still_link1 && !still_link2)
2170       return;
2171     
2172     /* now check which one of the elements we can remove from the group */
2173     if (still_link1) {
2174       element = element2;
2175     }
2176     else if (still_link2) {
2177       element = element1;
2178     }
2179
2180     /* we only remove elements that are not the entry point of a loop based
2181      * group and are not decoupled */
2182     if (!(group->entry == element &&
2183           group->type == GST_OPT_SCHEDULER_GROUP_LOOP) &&
2184         !GST_ELEMENT_IS_DECOUPLED (element)) {        
2185       GST_LOG ("element is separated from the group");
2186       
2187       /* have to decrement links to other groups from other pads */
2188       group_dec_links_for_element  (group, element);
2189       remove_from_group (group, element);
2190     }
2191     else {
2192       GST_LOG ("element is decoupled or entry in loop based group");
2193     }
2194   }
2195 }
2196
2197 static void
2198 gst_opt_scheduler_pad_select (GstScheduler *sched, GList *padlist)
2199 {
2200   //GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2201   
2202   g_warning ("pad select, implement me");
2203 }
2204
2205 /* a scheduler iteration is done by looping and scheduling the active chains */
2206 static GstSchedulerState
2207 gst_opt_scheduler_iterate (GstScheduler *sched)
2208 {
2209   GstSchedulerState state = GST_SCHEDULER_STATE_STOPPED;
2210   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2211   gint iterations = osched->iterations;
2212
2213   osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2214
2215   GST_DEBUG_OBJECT (sched, "iterating");
2216
2217   while (iterations) {
2218     gboolean scheduled = FALSE;
2219     GSList *chains;
2220
2221     /* we have to schedule each of the scheduler chains now */
2222     chains = osched->chains;
2223     while (chains) {
2224       GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2225
2226       ref_chain (chain);
2227       /* if the chain is not disabled, schedule it */
2228       if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
2229         GST_LOG ("scheduling chain %p", chain);
2230         schedule_chain (chain);
2231         scheduled = TRUE;
2232       }
2233
2234       /* don't schedule any more chains when in error */
2235       if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2236         GST_ERROR_OBJECT (sched, "in error state");
2237         break;
2238       } 
2239       else if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
2240         GST_DEBUG_OBJECT (osched, "got interrupted, continue with next chain");
2241         osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2242       }
2243
2244       chains = g_slist_next (chains);
2245       unref_chain (chain);
2246     }
2247
2248     /* at this point it's possible that the scheduler state is
2249      * in error, we then return an error */
2250     if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2251       state = GST_SCHEDULER_STATE_ERROR;
2252       break;
2253     }
2254     else {
2255       /* if chains were scheduled, return our current state */
2256       if (scheduled)
2257         state = GST_SCHEDULER_STATE (sched);
2258       /* if no chains were scheduled, we say we are stopped */
2259       else {
2260         state = GST_SCHEDULER_STATE_STOPPED;
2261         break;
2262       }
2263     }
2264     if (iterations > 0)
2265       iterations--;
2266   }
2267
2268   return state;
2269 }
2270
2271
2272 static void
2273 gst_opt_scheduler_show (GstScheduler *sched)
2274 {
2275   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2276   GSList *chains;
2277
2278   g_print ("iterations:    %d\n", osched->iterations);
2279   g_print ("max recursion: %d\n", osched->max_recursion);
2280
2281   chains = osched->chains;
2282   while (chains) {
2283     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2284     GSList *groups = chain->groups;
2285     chains = g_slist_next (chains);
2286
2287     g_print ("+- chain %p: refcount %d, %d groups, %d enabled, flags %d\n", 
2288                     chain, chain->refcount, chain->num_groups, chain->num_enabled, chain->flags);
2289
2290     while (groups) {
2291       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
2292       GSList *elements = group->elements;
2293       groups = g_slist_next (groups);
2294
2295       g_print (" +- group %p: refcount %d, %d elements, %d enabled, flags %d, entry %s, %s\n", 
2296                       group, group->refcount, group->num_elements, group->num_enabled, group->flags,
2297                       (group->entry ? GST_ELEMENT_NAME (group->entry): "(none)"),
2298                       (group->type == GST_OPT_SCHEDULER_GROUP_GET ? "get-based" : "loop-based") );
2299
2300       while (elements) {
2301         GstElement *element = (GstElement *) elements->data;
2302         elements = g_slist_next (elements);
2303
2304         g_print ("  +- element %s\n", GST_ELEMENT_NAME (element));
2305       }
2306     }
2307   }
2308 }
2309
2310 static void
2311 gst_opt_scheduler_get_property (GObject *object, guint prop_id,
2312                                 GValue *value, GParamSpec *pspec)
2313 {
2314   GstOptScheduler *osched;
2315   
2316   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2317
2318   osched = GST_OPT_SCHEDULER (object);
2319
2320   switch (prop_id) {
2321     case ARG_ITERATIONS:
2322       g_value_set_int (value, osched->iterations);
2323       break;
2324     case ARG_MAX_RECURSION:
2325       g_value_set_int (value, osched->max_recursion);
2326       break;
2327     default:
2328       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2329       break;
2330   }
2331 }
2332
2333 static void
2334 gst_opt_scheduler_set_property (GObject *object, guint prop_id,
2335                                 const GValue *value, GParamSpec *pspec)
2336 {
2337   GstOptScheduler *osched;
2338   
2339   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2340
2341   osched = GST_OPT_SCHEDULER (object);
2342
2343   switch (prop_id) {
2344     case ARG_ITERATIONS:
2345       osched->iterations = g_value_get_int (value);
2346       break;
2347     case ARG_MAX_RECURSION:
2348       osched->max_recursion = g_value_get_int (value);
2349       break;
2350     default:
2351       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2352       break;
2353   }
2354 }