Reference the group when we add an element to it.
[platform/upstream/gstreamer.git] / gst / schedulers / gstoptimalscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #include <gst/gst.h>
28
29 GST_DEBUG_CATEGORY_STATIC(debug_scheduler);
30 #define GST_CAT_DEFAULT debug_scheduler
31
32 #ifdef USE_COTHREADS
33 # include "cothreads_compat.h"
34 #else
35 # define COTHREADS_NAME_CAPITAL ""
36 # define COTHREADS_NAME         ""
37 #endif
38
39 #define GST_ELEMENT_SCHED_CONTEXT(elem)         ((GstOptSchedulerCtx*) (GST_ELEMENT (elem)->sched_private))
40 #define GST_ELEMENT_SCHED_GROUP(elem)           (GST_ELEMENT_SCHED_CONTEXT (elem)->group)
41 #define GST_PAD_BUFLIST(pad)                    ((GList*) (GST_REAL_PAD(pad)->sched_private))
42
43 #define GST_ELEMENT_COTHREAD_STOPPING                   GST_ELEMENT_SCHEDULER_PRIVATE1
44 #define GST_ELEMENT_IS_COTHREAD_STOPPING(element)       GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
45 #define GST_ELEMENT_INTERRUPTED                         GST_ELEMENT_SCHEDULER_PRIVATE2
46 #define GST_ELEMENT_IS_INTERRUPTED(element)             GST_FLAG_IS_SET((element), GST_ELEMENT_INTERRUPTED)
47
48 typedef struct _GstOptScheduler GstOptScheduler;
49 typedef struct _GstOptSchedulerClass GstOptSchedulerClass;
50
51 #define GST_TYPE_OPT_SCHEDULER \
52   (gst_opt_scheduler_get_type())
53 #define GST_OPT_SCHEDULER(obj) \
54   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_OPT_SCHEDULER,GstOptScheduler))
55 #define GST_OPT_SCHEDULER_CLASS(klass) \
56   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_OPT_SCHEDULER,GstOptSchedulerClass))
57 #define GST_IS_OPT_SCHEDULER(obj) \
58   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OPT_SCHEDULER))
59 #define GST_IS_OPT_SCHEDULER_CLASS(obj) \
60   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OPT_SCHEDULER))
61
62 typedef enum {
63   GST_OPT_SCHEDULER_STATE_NONE,
64   GST_OPT_SCHEDULER_STATE_STOPPED,
65   GST_OPT_SCHEDULER_STATE_ERROR,
66   GST_OPT_SCHEDULER_STATE_RUNNING,
67   GST_OPT_SCHEDULER_STATE_INTERRUPTED
68 } GstOptSchedulerState;
69
70 struct _GstOptScheduler {
71   GstScheduler           parent;
72
73   GstOptSchedulerState   state;
74
75 #ifdef USE_COTHREADS
76   cothread_context      *context;
77 #endif
78   gint                   iterations;
79
80   GSList                *elements;
81   GSList                *chains;
82
83   GList                 *runqueue;
84   gint                   recursion;
85
86   gint                   max_recursion;
87 };
88
89 struct _GstOptSchedulerClass {
90   GstSchedulerClass parent_class;
91 };
92
93 static GType _gst_opt_scheduler_type = 0;
94
95 typedef enum {
96   GST_OPT_SCHEDULER_CHAIN_DIRTY                 = (1 << 1),     
97   GST_OPT_SCHEDULER_CHAIN_DISABLED              = (1 << 2),
98   GST_OPT_SCHEDULER_CHAIN_RUNNING               = (1 << 3),
99 } GstOptSchedulerChainFlags;
100
101 #define GST_OPT_SCHEDULER_CHAIN_DISABLE(chain)          ((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DISABLED)
102 #define GST_OPT_SCHEDULER_CHAIN_ENABLE(chain)           ((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DISABLED)
103 #define GST_OPT_SCHEDULER_CHAIN_IS_DISABLED(chain)      ((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DISABLED)
104
105 typedef struct _GstOptSchedulerChain GstOptSchedulerChain;
106
107 struct _GstOptSchedulerChain {
108   gint                           refcount;
109   
110   GstOptScheduler               *sched;
111
112   GstOptSchedulerChainFlags      flags;
113   
114   GSList                        *groups;                        /* the groups in this chain */
115   gint                           num_groups;
116   gint                           num_enabled;
117 };
118
119 /* 
120  * elements that are scheduled in one cothread 
121  */
122 typedef enum {
123   GST_OPT_SCHEDULER_GROUP_DIRTY                 = (1 << 1),     /* this group has been modified */
124   GST_OPT_SCHEDULER_GROUP_COTHREAD_STOPPING     = (1 << 2),     /* the group's cothread stops after one iteration */
125   GST_OPT_SCHEDULER_GROUP_DISABLED              = (1 << 3),     /* this group is disabled */
126   GST_OPT_SCHEDULER_GROUP_RUNNING               = (1 << 4),     /* this group is running */
127   GST_OPT_SCHEDULER_GROUP_SCHEDULABLE           = (1 << 5),     /* this group is schedulable */
128   GST_OPT_SCHEDULER_GROUP_VISITED               = (1 << 6),     /* this group is visited when finding links */
129 } GstOptSchedulerGroupFlags;
130
131 typedef enum {
132   GST_OPT_SCHEDULER_GROUP_GET                   = 1,
133   GST_OPT_SCHEDULER_GROUP_LOOP                  = 2,
134 } GstOptSchedulerGroupType;
135
136 #define GST_OPT_SCHEDULER_GROUP_SET_FLAG(group,flag)    ((group)->flags |= (flag))
137 #define GST_OPT_SCHEDULER_GROUP_UNSET_FLAG(group,flag)  ((group)->flags &= ~(flag))
138 #define GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET(group,flag) ((group)->flags & (flag))
139
140 #define GST_OPT_SCHEDULER_GROUP_DISABLE(group)          ((group)->flags |= GST_OPT_SCHEDULER_GROUP_DISABLED)
141 #define GST_OPT_SCHEDULER_GROUP_ENABLE(group)           ((group)->flags &= ~GST_OPT_SCHEDULER_GROUP_DISABLED)
142 #define GST_OPT_SCHEDULER_GROUP_IS_ENABLED(group)       (!((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED))
143 #define GST_OPT_SCHEDULER_GROUP_IS_DISABLED(group)      ((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED)
144
145
146 typedef struct _GstOptSchedulerGroup GstOptSchedulerGroup;
147 typedef struct _GstOptSchedulerGroupLink GstOptSchedulerGroupLink;
148
149 /* used to keep track of links with other groups */
150 struct _GstOptSchedulerGroupLink {
151   GstOptSchedulerGroup  *group1;        /* the group we are linked with */
152   GstOptSchedulerGroup  *group2;        /* the group we are linked with */
153   gint                   count;         /* the number of links with the group */
154 };
155
156 #define IS_GROUP_LINK(link, group1, group2)     ((link->group1 == group1 && link->group2 == group2) || \
157                                                  (link->group2 == group1 && link->group1 == group2))
158 #define OTHER_GROUP_LINK(link, group)           (link->group1 == group ? link->group2 : link->group1)
159
160 typedef int (*GroupScheduleFunction)    (int argc, char *argv[]);
161
162 struct _GstOptSchedulerGroup {
163   GstOptSchedulerChain          *chain;                 /* the chain this group belongs to */
164   GstOptSchedulerGroupFlags      flags;                 /* flags for this group */
165   GstOptSchedulerGroupType       type;                  /* flags for this group */
166
167   gint                           refcount;
168
169   GSList                        *elements;              /* elements of this group */
170   gint                           num_elements;
171   gint                           num_enabled;
172   GstElement                    *entry;                 /* the group's entry point */
173
174   GSList                        *group_links;           /* other groups that are linked with this group */
175
176 #ifdef USE_COTHREADS
177   cothread                      *cothread;              /* the cothread of this group */
178 #else
179   GroupScheduleFunction          schedulefunc;
180 #endif
181   int                            argc;
182   char                         **argv;
183 };
184
185
186 /* some group operations */
187 static GstOptSchedulerGroup*    ref_group                       (GstOptSchedulerGroup *group);
188 #ifndef USE_COTHREADS
189 /*
190 static GstOptSchedulerGroup*    ref_group_by_count              (GstOptSchedulerGroup *group, gint count);
191 */
192 #endif
193 static GstOptSchedulerGroup*    unref_group                     (GstOptSchedulerGroup *group);
194 static void                     destroy_group                   (GstOptSchedulerGroup *group);
195 static void                     group_element_set_enabled       (GstOptSchedulerGroup *group, 
196                                                                  GstElement *element, gboolean enabled);
197
198 static void                     chain_group_set_enabled         (GstOptSchedulerChain *chain, 
199                                                                  GstOptSchedulerGroup *group, gboolean enabled);
200 /* 
201  * Scheduler private data for an element 
202  */
203 typedef struct _GstOptSchedulerCtx GstOptSchedulerCtx;
204
205 typedef enum {
206   GST_OPT_SCHEDULER_CTX_DISABLED                = (1 << 1),     /* the element is disabled */
207 } GstOptSchedulerCtxFlags;
208
209 struct _GstOptSchedulerCtx {
210   GstOptSchedulerGroup *group;                          /* the group this element belongs to */
211
212   GstOptSchedulerCtxFlags flags;                        /* flags for this element */
213 };
214
215 enum
216 {
217   ARG_0,
218   ARG_ITERATIONS,
219   ARG_MAX_RECURSION,
220 };
221  
222
223 static void             gst_opt_scheduler_class_init            (GstOptSchedulerClass *klass);
224 static void             gst_opt_scheduler_init                  (GstOptScheduler *scheduler);
225
226 static void             gst_opt_scheduler_set_property          (GObject *object, guint prop_id,
227                                                                  const GValue *value, GParamSpec *pspec);
228 static void             gst_opt_scheduler_get_property          (GObject *object, guint prop_id,
229                                                                  GValue *value, GParamSpec *pspec);
230
231 static void             gst_opt_scheduler_dispose               (GObject *object);
232
233 static void             gst_opt_scheduler_setup                 (GstScheduler *sched);
234 static void             gst_opt_scheduler_reset                 (GstScheduler *sched);
235 static void             gst_opt_scheduler_add_element           (GstScheduler *sched, GstElement *element);
236 static void             gst_opt_scheduler_remove_element        (GstScheduler *sched, GstElement *element);
237 static GstElementStateReturn  
238                         gst_opt_scheduler_state_transition      (GstScheduler *sched, GstElement *element, gint transition);
239 static void             gst_opt_scheduler_scheduling_change     (GstScheduler *sched, GstElement *element);
240 static void             gst_opt_scheduler_lock_element          (GstScheduler *sched, GstElement *element);
241 static void             gst_opt_scheduler_unlock_element        (GstScheduler *sched, GstElement *element);
242 static gboolean         gst_opt_scheduler_yield                 (GstScheduler *sched, GstElement *element);
243 static gboolean         gst_opt_scheduler_interrupt             (GstScheduler *sched, GstElement *element);
244 static void             gst_opt_scheduler_error                 (GstScheduler *sched, GstElement *element);
245 static void             gst_opt_scheduler_pad_link              (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
246 static void             gst_opt_scheduler_pad_unlink            (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
247 static void             gst_opt_scheduler_pad_select            (GstScheduler *sched, GList *padlist);
248 static GstClockReturn   gst_opt_scheduler_clock_wait            (GstScheduler *sched, GstElement *element,
249                                                                  GstClockID id, GstClockTimeDiff *jitter);
250 static GstSchedulerState
251                         gst_opt_scheduler_iterate               (GstScheduler *sched);
252
253 static void             gst_opt_scheduler_show                  (GstScheduler *sched);
254
255 static GstSchedulerClass *parent_class = NULL;
256
257 static GType
258 gst_opt_scheduler_get_type (void)
259 {
260   if (!_gst_opt_scheduler_type) {
261     static const GTypeInfo scheduler_info = {
262       sizeof (GstOptSchedulerClass),
263       NULL,
264       NULL,
265       (GClassInitFunc) gst_opt_scheduler_class_init,
266       NULL,
267       NULL,
268       sizeof (GstOptScheduler),
269       0,
270       (GInstanceInitFunc) gst_opt_scheduler_init,
271       NULL
272     };
273
274     _gst_opt_scheduler_type = g_type_register_static (GST_TYPE_SCHEDULER, 
275                     "GstOpt"COTHREADS_NAME_CAPITAL"Scheduler", &scheduler_info, 0);
276   }
277   return _gst_opt_scheduler_type;
278 }
279
280 static void
281 gst_opt_scheduler_class_init (GstOptSchedulerClass *klass)
282 {
283   GObjectClass *gobject_class;
284   GstObjectClass *gstobject_class;
285   GstSchedulerClass *gstscheduler_class;
286
287   gobject_class = (GObjectClass*)klass;
288   gstobject_class = (GstObjectClass*)klass;
289   gstscheduler_class = (GstSchedulerClass*)klass;
290
291   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
292
293   gobject_class->set_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_set_property);
294   gobject_class->get_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property);
295   gobject_class->dispose        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose);
296
297   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ITERATIONS,
298     g_param_spec_int ("iterations", "Iterations", 
299                       "Number of groups to schedule in one iteration (-1 == until EOS/error)",
300                       -1, G_MAXINT, 1, G_PARAM_READWRITE));
301 #ifndef USE_COTHREADS
302   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_RECURSION,
303     g_param_spec_int ("max_recursion", "Max recursion", 
304                       "Maximum number of recursions",
305                       1, G_MAXINT, 100, G_PARAM_READWRITE));
306 #endif
307
308   gstscheduler_class->setup             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_setup);
309   gstscheduler_class->reset             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_reset);
310   gstscheduler_class->add_element       = GST_DEBUG_FUNCPTR (gst_opt_scheduler_add_element);
311   gstscheduler_class->remove_element    = GST_DEBUG_FUNCPTR (gst_opt_scheduler_remove_element);
312   gstscheduler_class->state_transition  = GST_DEBUG_FUNCPTR (gst_opt_scheduler_state_transition);
313   gstscheduler_class->scheduling_change = GST_DEBUG_FUNCPTR (gst_opt_scheduler_scheduling_change);
314   gstscheduler_class->lock_element      = GST_DEBUG_FUNCPTR (gst_opt_scheduler_lock_element);
315   gstscheduler_class->unlock_element    = GST_DEBUG_FUNCPTR (gst_opt_scheduler_unlock_element);
316   gstscheduler_class->yield             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_yield);
317   gstscheduler_class->interrupt         = GST_DEBUG_FUNCPTR (gst_opt_scheduler_interrupt);
318   gstscheduler_class->error             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_error);
319   gstscheduler_class->pad_link          = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_link);
320   gstscheduler_class->pad_unlink        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_unlink);
321   gstscheduler_class->pad_select        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_select);
322   gstscheduler_class->clock_wait        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_clock_wait);
323   gstscheduler_class->iterate           = GST_DEBUG_FUNCPTR (gst_opt_scheduler_iterate);
324   gstscheduler_class->show              = GST_DEBUG_FUNCPTR (gst_opt_scheduler_show);
325   
326 #ifdef USE_COTHREADS
327   do_cothreads_init(NULL);
328 #endif
329 }
330
331 static void
332 gst_opt_scheduler_init (GstOptScheduler *scheduler)
333 {
334   scheduler->elements = NULL;
335   scheduler->iterations = 1;
336   scheduler->max_recursion = 100;
337 }
338
339 static void
340 gst_opt_scheduler_dispose (GObject *object)
341 {
342   G_OBJECT_CLASS (parent_class)->dispose (object);
343 }
344
345 static gboolean
346 plugin_init (GstPlugin *plugin)
347 {
348   GstSchedulerFactory *factory;
349
350   GST_DEBUG_CATEGORY_INIT (debug_scheduler, "scheduler", 0, "optimal scheduler");
351
352 #ifdef USE_COTHREADS
353   factory = gst_scheduler_factory_new ("opt"COTHREADS_NAME,
354                                        "An optimal scheduler using "COTHREADS_NAME" cothreads",
355                                       gst_opt_scheduler_get_type());
356 #else
357   factory = gst_scheduler_factory_new ("opt",
358                                        "An optimal scheduler using no cothreads",
359                                       gst_opt_scheduler_get_type());
360 #endif
361
362   if (factory != NULL) {
363     gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
364   }
365   else {
366     g_warning ("could not register scheduler: optimal");
367   }
368   return TRUE;
369 }
370
371 GST_PLUGIN_DEFINE (
372   GST_VERSION_MAJOR,
373   GST_VERSION_MINOR,
374   "gstopt"COTHREADS_NAME"scheduler",
375   "An optimal scheduler using "COTHREADS_NAME" cothreads",
376   plugin_init,
377   VERSION,
378   GST_LICENSE,
379   GST_PACKAGE,
380   GST_ORIGIN
381 );
382
383
384 static void
385 destroy_chain (GstOptSchedulerChain *chain)
386 {
387   GstOptScheduler *osched;
388   
389   GST_INFO ( "destroy chain %p", chain);
390
391   g_assert (chain->num_groups == 0);
392   g_assert (chain->groups == NULL);
393
394   osched = chain->sched;
395   osched->chains = g_slist_remove (osched->chains, chain);
396
397   gst_object_unref (GST_OBJECT (osched));
398
399   g_free (chain);
400 }
401
402 static GstOptSchedulerChain*
403 create_chain (GstOptScheduler *osched)
404 {
405   GstOptSchedulerChain *chain;
406
407   chain = g_new0 (GstOptSchedulerChain, 1);
408   chain->sched = osched;
409   chain->refcount = 1;
410   chain->flags = GST_OPT_SCHEDULER_CHAIN_DISABLED;
411
412   gst_object_ref (GST_OBJECT (osched));
413   osched->chains = g_slist_prepend (osched->chains, chain);
414
415   GST_INFO ( "new chain %p", chain);
416
417   return chain;
418 }
419
420 static GstOptSchedulerChain*
421 ref_chain (GstOptSchedulerChain *chain)
422 {
423   GST_LOG ("ref chain %p %d->%d", chain, 
424            chain->refcount, chain->refcount+1);
425   chain->refcount++;
426
427   return chain;
428 }
429
430 static GstOptSchedulerChain*
431 unref_chain (GstOptSchedulerChain *chain)
432 {
433   GST_LOG ("unref chain %p %d->%d", chain, 
434            chain->refcount, chain->refcount-1);
435
436   if (--chain->refcount == 0) {
437     destroy_chain (chain);
438     chain = NULL;
439   }
440
441   return chain;
442 }
443
444 static GstOptSchedulerChain*
445 add_to_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
446 {
447   GST_INFO ( "adding group %p to chain %p", group, chain);
448
449   g_assert (group->chain == NULL);
450
451   group = ref_group (group);
452
453   group->chain = ref_chain (chain);
454   chain->groups = g_slist_prepend (chain->groups, group);
455   chain->num_groups++;
456
457   if (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group)) {
458     chain_group_set_enabled (chain, group, TRUE);
459   }
460
461   return chain;
462 }
463
464 static GstOptSchedulerChain*
465 remove_from_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
466 {
467   GST_INFO ( "removing group %p from chain %p", group, chain);
468
469   if (!chain)
470     return NULL;
471
472   g_assert (group);
473   g_assert (group->chain == chain);
474
475   group->chain = NULL;
476   chain->groups = g_slist_remove (chain->groups, group);
477   chain->num_groups--;
478   unref_group (group);
479
480   if (chain->num_groups == 0) 
481     chain = unref_chain (chain);
482
483   chain = unref_chain (chain);
484   return chain;
485 }
486
487 static GstOptSchedulerChain*
488 merge_chains (GstOptSchedulerChain *chain1, GstOptSchedulerChain *chain2)
489 {
490   GSList *walk;
491
492   g_assert (chain1 != NULL);
493   
494   GST_INFO ( "merging chain %p and %p", chain1, chain2);
495   
496   if (chain1 == chain2 || chain2 == NULL)
497     return chain1;
498
499   ref_chain (chain2);
500   walk = chain2->groups;
501   while (walk) {
502     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
503     walk = g_slist_next (walk);
504
505     GST_INFO ( "reparenting group %p from chain %p to %p", 
506                     group, chain2, chain1);
507
508     group->chain = NULL;
509     chain2->num_groups--;
510     chain2 = unref_chain (chain2);
511
512     group->chain = ref_chain (chain1);
513     chain1->groups = g_slist_prepend (chain1->groups, group);
514     chain1->num_groups++;
515   }
516   g_slist_free (chain2->groups);
517   chain2->groups = NULL;
518   unref_chain (chain2);
519
520   return chain1;
521 }
522
523 static void
524 chain_group_set_enabled (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group, gboolean enabled)
525 {
526   g_assert (chain != NULL);
527   g_assert (group != NULL);
528
529   GST_INFO ( "request to %d group %p in chain %p, have %d groups enabled out of %d", 
530                   enabled, group, chain, chain->num_enabled, chain->num_groups);
531
532   if (enabled)
533     GST_OPT_SCHEDULER_GROUP_ENABLE (group);
534   else 
535     GST_OPT_SCHEDULER_GROUP_DISABLE (group);
536
537   if (enabled) {
538     if (chain->num_enabled < chain->num_groups)
539       chain->num_enabled++;
540
541     GST_INFO ( "enable group %p in chain %p, now %d groups enabled out of %d", group, chain,
542                     chain->num_enabled, chain->num_groups);
543
544     if (chain->num_enabled == chain->num_groups) {
545       GST_INFO ( "enable chain %p", chain);
546       GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
547     }
548   }
549   else {
550     if (chain->num_enabled > 0)
551       chain->num_enabled--;
552
553     GST_INFO ( "disable group %p in chain %p, now %d groups enabled out of %d", group, chain,
554                     chain->num_enabled, chain->num_groups);
555
556     if (chain->num_enabled == 0) {
557       GST_INFO ( "disable chain %p", chain);
558       GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
559     }
560   }
561 }
562
563 /* recursively migrate the group and all connected groups into the new chain */
564 static void
565 chain_recursively_migrate_group (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
566 {
567   GSList *links;
568   
569   /* group already in chain */
570   if (group->chain == chain)
571     return;
572
573   /* first remove the group from its old chain */
574   remove_from_chain (group->chain, group);
575   /* add to new chain */
576   add_to_chain (chain, group);
577
578   /* then follow all links */
579   links = group->group_links;
580   while (links) {
581     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
582     links = g_slist_next (links);
583
584     chain_recursively_migrate_group (chain, (link->group1 == group ? link->group2 : link->group1));
585   }
586 }
587
588 static GstOptSchedulerGroup*
589 ref_group (GstOptSchedulerGroup *group)
590 {
591   GST_LOG ("ref group %p %d->%d", group, 
592            group->refcount, group->refcount+1);
593
594   group->refcount++;
595
596   return group;
597 }
598
599 #ifndef USE_COTHREADS
600 /* remove me
601 static GstOptSchedulerGroup*
602 ref_group_by_count (GstOptSchedulerGroup *group, gint count)
603 {
604   GST_LOG ("ref group %p %d->%d", group, 
605            group->refcount, group->refcount+count);
606
607   group->refcount += count;
608
609   return group;
610 }
611 */
612 #endif
613
614 static GstOptSchedulerGroup*
615 unref_group (GstOptSchedulerGroup *group)
616 {
617   GST_LOG ("unref group %p %d->%d", group, 
618            group->refcount, group->refcount-1);
619
620   if (--group->refcount == 1) {
621     destroy_group (group);
622     group = NULL;
623   }
624
625   return group;
626 }
627
628 static GstOptSchedulerGroup*
629 add_to_group (GstOptSchedulerGroup *group, GstElement *element)
630 {
631   g_assert (group != NULL);
632   g_assert (element != NULL);
633
634   GST_INFO ( "adding element \"%s\" to group %p", GST_ELEMENT_NAME (element), group);
635
636   if (GST_ELEMENT_IS_DECOUPLED (element)) {
637     GST_INFO ( "element \"%s\" is decoupled, not adding to group %p", 
638               GST_ELEMENT_NAME (element), group);
639     return group;
640   }
641
642   g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);
643
644   GST_ELEMENT_SCHED_GROUP (element) = ref_group (group);
645
646   gst_object_ref (GST_OBJECT (element));
647   group->elements = g_slist_prepend (group->elements, element);
648   group->num_elements++;
649
650   if (gst_element_get_state (element) == GST_STATE_PLAYING) {
651     group_element_set_enabled (group, element, TRUE);
652   }
653
654   /* Ref the group... */
655   ref_group (group);
656   
657   return group;
658 }
659
660 static GstOptSchedulerGroup*
661 create_group (GstOptSchedulerChain *chain, GstElement *element)
662 {
663   GstOptSchedulerGroup *group;
664
665   group = g_new0 (GstOptSchedulerGroup, 1);
666   GST_INFO ( "new group %p", group);
667   group->refcount = 1;
668   group->flags = GST_OPT_SCHEDULER_GROUP_DISABLED;
669
670   add_to_group (group, element);
671   add_to_chain (chain, group);
672   
673   return group;
674 }
675
676 static void 
677 destroy_group_scheduler (GstOptSchedulerGroup *group) 
678 {
679   g_assert (group);
680
681   if (group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)
682     g_warning ("destroying running group scheduler");
683
684 #ifdef USE_COTHREADS
685   if (group->cothread) {
686     do_cothread_destroy (group->cothread);
687     group->cothread = NULL;
688   }
689 #else
690   group->schedulefunc = NULL;
691   group->argc = 0;
692   group->argv = NULL;
693 #endif
694
695   group->flags &= ~GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
696 }
697
698 static void
699 destroy_group (GstOptSchedulerGroup *group)
700 {
701   GST_INFO ( "destroy group %p", group);
702
703   g_assert (group != NULL);
704   g_assert (group->elements == NULL);
705
706   remove_from_chain (group->chain, group);
707
708   if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
709     destroy_group_scheduler (group);
710
711   g_free (group);
712 }
713
714 static GstOptSchedulerGroup*
715 remove_from_group (GstOptSchedulerGroup *group, GstElement *element)
716 {
717   GST_INFO ( "removing element \"%s\" from group %p", GST_ELEMENT_NAME (element), group);
718
719   g_assert (group != NULL);
720   g_assert (element != NULL);
721   g_assert (GST_ELEMENT_SCHED_GROUP (element) == group);
722
723   group->elements = g_slist_remove (group->elements, element);
724   group->num_elements--;
725
726   /* if the element was an entry point in the group, clear the group's
727    * entry point */
728   if (group->entry == element) {
729     group->entry = NULL;
730   }
731
732   GST_ELEMENT_SCHED_GROUP (element) = NULL;
733   gst_object_unref (GST_OBJECT (element));
734
735   if (group->num_elements == 0) {
736     group = unref_group (group);
737   }
738   group = unref_group (group);
739
740   return group;
741 }
742
743 static GstOptSchedulerGroup*
744 merge_groups (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
745 {
746   g_assert (group1 != NULL);
747
748   GST_INFO ( "merging groups %p and %p", group1, group2);
749   
750   if (group1 == group2 || group2 == NULL)
751     return group1;
752
753   while (group2 && group2->elements) {
754     GstElement *element = (GstElement *)group2->elements->data;
755
756     group2 = remove_from_group (group2, element);
757     add_to_group (group1, element);
758   }
759   
760   return group1;
761 }
762
763 static void
764 group_error_handler (GstOptSchedulerGroup *group) 
765 {
766   GST_INFO ( "group %p has errored", group);
767
768   chain_group_set_enabled (group->chain, group, FALSE);
769   group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
770 }
771
772 /* this function enables/disables an element, it will set/clear a flag on the element 
773  * and tells the chain that the group is enabled if all elements inside the group are
774  * enabled */
775 static void
776 group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gboolean enabled)
777 {
778   g_assert (group != NULL);
779   g_assert (element != NULL);
780
781   GST_INFO ( "request to %d element %s in group %p, have %d elements enabled out of %d", 
782                     enabled, GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
783
784   if (enabled) {
785     if (group->num_enabled < group->num_elements)
786       group->num_enabled++;
787
788     GST_INFO ( "enable element %s in group %p, now %d elements enabled out of %d", 
789                     GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
790
791     if (group->num_enabled == group->num_elements) {
792       GST_INFO ( "enable group %p", group);
793       chain_group_set_enabled (group->chain, group, TRUE);
794     }
795   }
796   else {
797     if (group->num_enabled > 0)
798       group->num_enabled--;
799
800     GST_INFO ( "disable element %s in group %p, now %d elements enabled out of %d", 
801                     GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
802
803     if (group->num_enabled == 0) {
804       GST_INFO ( "disable group %p", group);
805       chain_group_set_enabled (group->chain, group, FALSE);
806     }
807   }
808 }
809
810 /* a group is scheduled by doing a cothread switch to it or
811  * by calling the schedule function. In the non-cothread case
812  * we cannot run already running groups so we return FALSE here
813  * to indicate this to the caller */
814 static gboolean 
815 schedule_group (GstOptSchedulerGroup *group) 
816 {
817   if (!group->entry) {
818     GST_INFO ( "not scheduling group %p without entry", group);
819     return FALSE;
820   }
821
822 #ifdef USE_COTHREADS
823   if (group->cothread)
824     do_cothread_switch (group->cothread);
825   else
826     g_warning ("(internal error): trying to schedule group without cothread");
827   return TRUE;
828 #else
829   /* cothreads automatically call the pre- and post-run functions for us;
830    * without cothreads we need to call them manually */
831   if (group->schedulefunc == NULL) {
832     GST_INFO ( "not scheduling group %p without schedulefunc", 
833                     group);
834     return FALSE;
835   } else {
836     GSList *l;
837
838     for (l=group->elements; l; l=l->next) {
839       GstElement *e = (GstElement*)l->data;
840       if (e->pre_run_func)
841         e->pre_run_func (e);
842     }
843
844     group->schedulefunc (group->argc, group->argv);
845
846     for (l=group->elements; l; l=l->next) {
847       GstElement *e = (GstElement*)l->data;
848       if (e->post_run_func)
849         e->post_run_func (e);
850     }
851
852   }
853   return TRUE;
854 #endif
855 }
856
857 #ifndef USE_COTHREADS
858 static void
859 gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
860 {
861   GST_LOG_OBJECT (osched, "entering scheduler run queue recursion %d %d", 
862                   osched->recursion, g_list_length (osched->runqueue));
863
864   /* make sure we don't exceed max_recursion */
865   if (osched->recursion > osched->max_recursion) {
866     osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
867     return;
868   }
869
870   osched->recursion++;
871
872   while (osched->runqueue) {
873     GstOptSchedulerGroup *group;
874     gboolean res;
875     
876     group = (GstOptSchedulerGroup *) osched->runqueue->data;
877
878     /* runqueue hols refcount to group */
879     osched->runqueue = g_list_remove (osched->runqueue, group);
880
881     GST_LOG_OBJECT (osched, "scheduling group %p", group);
882
883     res = schedule_group (group);
884     if (!res) {
885       g_warning  ("error scheduling group %p", group);
886       group_error_handler (group);
887     }
888     else {
889       GST_LOG_OBJECT (osched, "done scheduling group %p", group);
890     }
891     unref_group (group);
892   }
893
894   GST_LOG_OBJECT (osched, "run queue length after scheduling %d", g_list_length (osched->runqueue));
895
896   osched->recursion--;
897 }
898 #endif
899
900 /* a chain is scheduled by picking the first active group and scheduling it */
901 static void
902 schedule_chain (GstOptSchedulerChain *chain) 
903 {
904   GSList *groups;
905   GstOptScheduler *osched;
906
907   osched = chain->sched;
908   groups = chain->groups;
909
910   while (groups) {
911     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
912
913     if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
914       ref_group (group);
915       GST_LOG ("scheduling group %p in chain %p", 
916                group, chain);
917
918 #ifdef USE_COTHREADS
919       schedule_group (group);
920 #else
921       osched->recursion = 0;
922       if (!g_list_find (osched->runqueue, group))
923       {
924         ref_group (group);
925         osched->runqueue = g_list_append (osched->runqueue, group);
926       }
927       gst_opt_scheduler_schedule_run_queue (osched);
928 #endif
929
930       GST_LOG ("done scheduling group %p in chain %p", 
931                group, chain);
932       unref_group (group);
933       break;
934     }
935
936     groups = g_slist_next (groups);
937   }
938 }
939
940 /* a get-based group is scheduled by getting a buffer from the get based
941  * entry point and by pushing the buffer to the peer.
942  * We also set the running flag on this group for as long as this
943  * function is running. */
944 static int
945 get_group_schedule_function (int argc, char *argv[])
946 {
947   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
948   GstElement *entry = group->entry;
949   const GList *pads = gst_element_get_pad_list (entry);
950
951   GST_LOG ("get wrapper of group %p", group);
952
953   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
954
955   while (pads) {
956     GstData *data;
957     GstPad *pad = GST_PAD (pads->data);
958     pads = g_list_next (pads);
959
960     /* skip sinks and ghostpads */
961     if (!GST_PAD_IS_SRC (pad) || !GST_IS_REAL_PAD (pad))
962       continue;
963
964     GST_LOG ("doing get and push on pad \"%s:%s\" in group %p", 
965              GST_DEBUG_PAD_NAME (pad), group);
966
967     data = GST_RPAD_GETFUNC (pad) (pad);
968     if (data) {
969       if (GST_EVENT_IS_INTERRUPT (data)) {
970         gst_event_unref (GST_EVENT (data));
971         break;
972       }
973       gst_pad_push (pad, data);
974     }
975   }
976
977   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
978
979   return 0;
980 }
981
982 /* a loop-based group is scheduled by calling the loop function
983  * on the entry point. 
984  * We also set the running flag on this group for as long as this
985  * function is running. */
986 static int
987 loop_group_schedule_function (int argc, char *argv[])
988 {
989   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
990   GstElement *entry = group->entry;
991
992   GST_LOG ("loop wrapper of group %p", group);
993
994   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
995
996   GST_LOG ("calling loopfunc of element %s in group %p", 
997            GST_ELEMENT_NAME (entry), group);
998
999   entry->loopfunc (entry);
1000
1001   GST_LOG ("loopfunc ended of element %s in group %p", 
1002            GST_ELEMENT_NAME (entry), group);
1003
1004   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
1005
1006   return 0;
1007
1008 }
1009
1010 /* the function to schedule an unknown group, which just gives an error */
1011 static int
1012 unknown_group_schedule_function (int argc, char *argv[])
1013 {
1014   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1015
1016   g_warning ("(internal error) unknown group type %d, disabling\n", group->type);
1017   group_error_handler (group);
1018
1019   return 0;
1020 }
1021
1022 /* this function is called when the first element of a chain-loop or a loop-loop
1023  * link performs a push to the loop element. We then schedule the
1024  * group with the loop-based element until the bufpen is empty */
1025 static void
1026 gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstData *data)
1027 {
1028   GstOptSchedulerGroup *group;
1029   GstOptScheduler *osched;
1030
1031   GST_LOG ("loop wrapper, putting buffer in bufpen");
1032
1033   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
1034   osched = group->chain->sched;
1035
1036
1037 #ifdef USE_COTHREADS
1038   if (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))) {
1039     g_warning ("deadlock detected, disabling group %p", group);
1040     group_error_handler (group);
1041   }
1042   else {
1043     GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), data);
1044     schedule_group (group);
1045   }
1046 #else
1047   GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), data);
1048   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1049     GST_LOG ("adding %p to runqueue", group);
1050     if (!g_list_find (osched->runqueue, group))
1051     {
1052       ref_group (group);
1053       osched->runqueue = g_list_append (osched->runqueue, group);
1054     }
1055   }
1056 #endif
1057   
1058   GST_LOG ("after loop wrapper buflist %d", 
1059             g_list_length (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))));
1060 }
1061
1062 /* this function is called by a loop based element that performs a
1063  * pull on a sinkpad. We schedule the peer group until the bufpen
1064  * is filled with the buffer so that this function  can return */
1065 static GstData*
1066 gst_opt_scheduler_get_wrapper (GstPad *srcpad)
1067 {
1068   GstData *data;
1069   GstOptSchedulerGroup *group;
1070   GstOptScheduler *osched;
1071   gboolean disabled;
1072     
1073   GST_LOG ("get wrapper, removing buffer from bufpen");
1074
1075   /* first try to grab a queued buffer */
1076   if (GST_PAD_BUFLIST (srcpad)) {
1077     data = GST_PAD_BUFLIST (srcpad)->data;
1078     GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), data);
1079     
1080     GST_LOG ("get wrapper, returning queued data %d",
1081              g_list_length (GST_PAD_BUFLIST (srcpad)));
1082
1083     return data;
1084   }
1085
1086   /* else we need to schedule the peer element */
1087   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad));
1088   osched = group->chain->sched;
1089   data = NULL;
1090   disabled = FALSE;
1091
1092   do {
1093 #ifdef USE_COTHREADS
1094     schedule_group (group);
1095 #else
1096     if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1097       ref_group (group);
1098
1099       if (!g_list_find (osched->runqueue, group))
1100       {
1101         ref_group (group);
1102         osched->runqueue = g_list_append (osched->runqueue, group);
1103       }
1104
1105       GST_LOG_OBJECT (osched, "recursing into scheduler group %p", group);
1106       gst_opt_scheduler_schedule_run_queue (osched);
1107       GST_LOG_OBJECT (osched, "return from recurse group %p", group);
1108
1109       /* if the other group was disabled we might have to break out of the loop */
1110       disabled = GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group);
1111       group = unref_group (group);
1112       /* group is gone */
1113       if (group == NULL) {
1114         /* if the group was gone we also might have to break out of the loop */
1115         disabled = TRUE;
1116       }
1117     }
1118     else {
1119       /* in this case, the group was running and we wanted to swtich to it,
1120        * this is not allowed in the optimal scheduler (yet) */
1121       g_warning ("deadlock detected, disabling group %p", group);
1122       group_error_handler (group);
1123       return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1124     }
1125 #endif
1126     /* if the scheduler interrupted, make sure we send an INTERRUPTED event to the
1127      * loop based element */
1128     if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
1129       GST_INFO ( "scheduler interrupted, return interrupt event");
1130       data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1131     }
1132     else {
1133       if (GST_PAD_BUFLIST (srcpad)) {
1134         data = GST_PAD_BUFLIST (srcpad)->data;
1135         GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), data);
1136       }
1137       else if (disabled) {
1138         /* no buffer in queue and peer group was disabled */
1139         data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1140       }
1141     }
1142   }
1143   while (data == NULL);
1144
1145   GST_LOG ("get wrapper, returning data %p, queue length %d",
1146            data, g_list_length (GST_PAD_BUFLIST (srcpad)));
1147
1148   return data;
1149 }
1150
1151 /* this function is a chain wrapper for non-event-aware plugins,
1152  * it'll simply dispatch the events to the (default) event handler */
1153 static void
1154 gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstData *data)
1155 {
1156   if (GST_IS_EVENT (data)) {
1157     gst_pad_send_event (sinkpad, GST_EVENT (data));
1158   }
1159   else {
1160     GST_RPAD_CHAINFUNC (sinkpad) (sinkpad, data);
1161   }
1162 }
1163
1164 static void
1165 clear_queued (GstData *data, gpointer user_data)
1166 {
1167   gst_data_unref (data);
1168 }
1169
1170 static void
1171 pad_clear_queued (GstPad *srcpad, gpointer user_data)
1172 {
1173   GList *buflist = GST_PAD_BUFLIST (srcpad);
1174
1175   if (buflist) {
1176     GST_INFO ( "need to clear some buffers");
1177     g_list_foreach (buflist, (GFunc) clear_queued, NULL);
1178     g_list_free (buflist);
1179     GST_PAD_BUFLIST (srcpad) = NULL;
1180   }
1181 }
1182
1183 static gboolean
1184 gst_opt_scheduler_event_wrapper (GstPad *srcpad, GstEvent *event)
1185 {
1186   gboolean flush;
1187
1188   GST_LOG ("intercepting event %d on pad %s:%s", 
1189            GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (srcpad));
1190   
1191   /* figure out if this is a flush event */
1192   switch (GST_EVENT_TYPE (event)) {
1193     case GST_EVENT_FLUSH:
1194       flush = TRUE;
1195       break;
1196     case GST_EVENT_SEEK:
1197     case GST_EVENT_SEEK_SEGMENT:
1198       flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
1199       break;
1200     default:
1201       flush = FALSE;
1202       break;
1203   }
1204
1205   if (flush) {
1206     GST_LOG ("event is flush");
1207
1208     pad_clear_queued (srcpad, NULL);
1209   }
1210   return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
1211 }
1212
1213
1214 /* setup the scheduler context for a group. The right schedule function
1215  * is selected based on the group type and cothreads are created if 
1216  * needed */
1217 static void 
1218 setup_group_scheduler (GstOptScheduler *osched, GstOptSchedulerGroup *group) 
1219 {
1220   GroupScheduleFunction wrapper;
1221
1222   wrapper = unknown_group_schedule_function;
1223
1224   /* figure out the wrapper function for this group */
1225   if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
1226     wrapper = get_group_schedule_function;
1227   else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
1228     wrapper = loop_group_schedule_function;
1229         
1230 #ifdef USE_COTHREADS
1231   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
1232     do_cothread_create (group->cothread, osched->context,
1233                       (cothread_func) wrapper, 0, (char **) group);
1234   }
1235   else {
1236     do_cothread_setfunc (group->cothread, osched->context,
1237                       (cothread_func) wrapper, 0, (char **) group);
1238   }
1239 #else
1240   group->schedulefunc = wrapper;
1241   group->argc = 0;
1242   group->argv = (char **) group;
1243 #endif
1244   group->flags |= GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
1245 }
1246
1247 static GstElementStateReturn
1248 gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
1249 {
1250   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1251   GstOptSchedulerGroup *group;
1252   GstElementStateReturn res = GST_STATE_SUCCESS;
1253   
1254   GST_INFO ( "element \"%s\" state change %d", GST_ELEMENT_NAME (element), transition);
1255
1256   /* we check the state of the managing pipeline here */
1257   if (GST_IS_BIN (element)) {
1258     if (GST_SCHEDULER_PARENT (sched) == element) {
1259       GST_INFO ( "parent \"%s\" changed state", GST_ELEMENT_NAME (element));
1260
1261       switch (transition) {
1262         case GST_STATE_PLAYING_TO_PAUSED:
1263           GST_INFO ( "setting scheduler state to stopped");
1264           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED;
1265           break;
1266         case GST_STATE_PAUSED_TO_PLAYING:
1267           GST_INFO ( "setting scheduler state to running");
1268           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING;
1269           break;
1270         default:
1271           GST_INFO ( "no interesting state change, doing nothing");
1272       }
1273     }
1274     return res;
1275   }
1276
1277   /* we don't care about decoupled elements after this */
1278   if (GST_ELEMENT_IS_DECOUPLED (element))
1279     return GST_STATE_SUCCESS;
1280
1281   /* get the group of the element */
1282   group = GST_ELEMENT_SCHED_GROUP (element);
1283
1284   switch (transition) {
1285     case GST_STATE_PAUSED_TO_PLAYING:
1286       /* an element withut a group has to be an unlinked src, sink
1287        * filter element */
1288       if (!group) {
1289         GST_INFO ( "element \"%s\" has no group", GST_ELEMENT_NAME (element));
1290         res = GST_STATE_FAILURE;
1291       }
1292       /* else construct the scheduling context of this group and enable it */
1293       else {
1294         setup_group_scheduler (osched, group);
1295         group_element_set_enabled (group, element, TRUE);
1296       }
1297       break;
1298     case GST_STATE_PLAYING_TO_PAUSED:
1299       /* if the element still has a group, we disable it */
1300       if (group) 
1301         group_element_set_enabled (group, element, FALSE);
1302       break;
1303     case GST_STATE_PAUSED_TO_READY:
1304     {  
1305       GList *pads = (GList *) gst_element_get_pad_list (element);
1306
1307       g_list_foreach (pads, (GFunc) pad_clear_queued, NULL);
1308       break;
1309     }
1310     default:
1311       break;
1312   }
1313
1314   return res;
1315 }
1316
1317 static void
1318 gst_opt_scheduler_scheduling_change (GstScheduler *sched, GstElement *element)
1319 {
1320   g_warning ("scheduling change, implement me");
1321 }
1322
1323 static void
1324 get_group (GstElement *element, GstOptSchedulerGroup **group)
1325 {
1326   GstOptSchedulerCtx *ctx;
1327   /*GList *pads;*/
1328
1329   ctx = GST_ELEMENT_SCHED_CONTEXT (element);
1330   if (ctx) 
1331     *group = ctx->group;
1332   else
1333     *group = NULL;
1334 }
1335
1336 /*
1337  * the idea is to put the two elements into the same group. 
1338  * - When no element is inside a group, we create a new group and add 
1339  *   the elements to it. 
1340  * - When one of the elements has a group, add the other element to 
1341  *   that group
1342  * - if both of the elements have a group, we merge the groups, which
1343  *   will also merge the chains.
1344  */
1345 static GstOptSchedulerGroup*
1346 group_elements (GstOptScheduler *osched, GstElement *element1, GstElement *element2)
1347 {
1348   GstOptSchedulerGroup *group1, *group2, *group = NULL;
1349   
1350   get_group (element1, &group1);
1351   get_group (element2, &group2);
1352   
1353   /* none of the elements is added to a group, create a new group
1354    * and chain to add the elements to */
1355   if (!group1 && !group2) {
1356     GstOptSchedulerChain *chain;
1357
1358     GST_INFO ( "creating new group to hold \"%s\" and \"%s\"", 
1359                   GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1360
1361     chain = create_chain (osched);
1362     group = create_group (chain, element1);
1363     add_to_group (group, element2);
1364   }
1365   /* the first element has a group */
1366   else if (group1) {
1367     GST_INFO ( "adding \"%s\" to \"%s\"'s group", 
1368                   GST_ELEMENT_NAME (element2), GST_ELEMENT_NAME (element1));
1369
1370     /* the second element also has a group, merge */
1371     if (group2)
1372       merge_groups (group1, group2);
1373     /* the second element has no group, add it to the group
1374      * of the first element */
1375     else
1376       add_to_group (group1, element2);
1377
1378     group = group1;
1379   }
1380   /* element1 has no group, element2 does. Add element1 to the
1381    * group of element2 */
1382   else {
1383     GST_INFO ( "adding \"%s\" to \"%s\"'s group", 
1384                   GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1385     add_to_group (group2, element1);
1386     group = group2;
1387   }
1388   return group;
1389 }
1390
1391 /*
1392  * increment link counts between groups
1393  */
1394 static void
1395 group_inc_link (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
1396 {
1397   GSList *links = group1->group_links;
1398   gboolean done = FALSE;
1399   GstOptSchedulerGroupLink *link;
1400
1401   /* first try to find a previous link */
1402   while (links && !done) {
1403     link = (GstOptSchedulerGroupLink *) links->data;
1404     links = g_slist_next (links);
1405     
1406     if (IS_GROUP_LINK (link, group1, group2)) {
1407       /* we found a link to this group, increment the link count */
1408       link->count++;
1409       GST_INFO ( "incremented group link count between %p and %p to %d", 
1410                   group1, group2, link->count);
1411       done = TRUE;
1412     }
1413   }
1414   if (!done) {
1415     /* no link was found, create a new one */
1416     link = g_new0 (GstOptSchedulerGroupLink, 1);
1417
1418     link->group1 = group1;
1419     link->group2 = group2;
1420     link->count = 1;
1421
1422     group1->group_links = g_slist_prepend (group1->group_links, link);
1423     group2->group_links = g_slist_prepend (group2->group_links, link);
1424
1425     GST_INFO ( "added group link count between %p and %p", 
1426                   group1, group2);
1427   }
1428 }
1429
1430 /*
1431  * decrement link counts between groups, returns TRUE if the link count reaches 0
1432  */
1433 static gboolean
1434 group_dec_link (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
1435 {
1436   GSList *links = group1->group_links;
1437   gboolean res = FALSE;
1438   GstOptSchedulerGroupLink *link;
1439
1440   while (links) {
1441     link = (GstOptSchedulerGroupLink *) links->data;
1442     links = g_slist_next (links);
1443     
1444     if (IS_GROUP_LINK (link, group1, group2)) {
1445       link->count--;
1446       GST_INFO ( "link count between %p and %p is now %d", 
1447                   group1, group2, link->count);
1448       if (link->count == 0) {
1449         group1->group_links = g_slist_remove (group1->group_links, link);
1450         group2->group_links = g_slist_remove (group2->group_links, link);
1451         g_free (link);
1452         GST_INFO ( "removed group link between %p and %p", 
1453                   group1, group2);
1454         res = TRUE;
1455       }
1456       break;
1457     }
1458   }
1459   return res;
1460 }
1461
1462
1463 typedef enum {
1464   GST_OPT_INVALID,
1465   GST_OPT_GET_TO_CHAIN,
1466   GST_OPT_LOOP_TO_CHAIN,
1467   GST_OPT_GET_TO_LOOP,
1468   GST_OPT_CHAIN_TO_CHAIN,
1469   GST_OPT_CHAIN_TO_LOOP,
1470   GST_OPT_LOOP_TO_LOOP,
1471 } LinkType;
1472
1473 /*
1474  * Entry points for this scheduler.
1475  */
1476 static void
1477 gst_opt_scheduler_setup (GstScheduler *sched)
1478 {   
1479 #ifdef USE_COTHREADS
1480   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1481               
1482   /* first create thread context */
1483   if (osched->context == NULL) {
1484     GST_DEBUG ( "initializing cothread context");
1485     osched->context = do_cothread_context_init ();
1486   }
1487 #endif
1488
1489   
1490 static void 
1491 gst_opt_scheduler_reset (GstScheduler *sched)
1492
1493 #ifdef USE_COTHREADS
1494   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1495   GSList *chains = osched->chains;
1496
1497   while (chains) {
1498     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
1499     GSList *groups = chain->groups;
1500
1501     while (groups) {
1502       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1503
1504       destroy_group_scheduler (group);
1505       groups = groups->next;
1506     }
1507     chains = chains->next;
1508   }
1509               
1510   if (osched->context) {
1511     do_cothread_context_destroy (osched->context);
1512     osched->context = NULL; 
1513   }
1514 #endif
1515 }     
1516 static void
1517 gst_opt_scheduler_add_element (GstScheduler *sched, GstElement *element)
1518 {
1519   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1520   GstOptSchedulerCtx *ctx;
1521   const GList *pads; 
1522
1523   GST_INFO ( "adding element \"%s\" to scheduler", GST_ELEMENT_NAME (element));
1524
1525   /* decoupled elements are not added to the scheduler lists */
1526   if (GST_ELEMENT_IS_DECOUPLED (element))
1527     return;
1528
1529   ctx = g_new0 (GstOptSchedulerCtx, 1);
1530   GST_ELEMENT_SCHED_CONTEXT (element) = ctx;
1531   ctx->flags = GST_OPT_SCHEDULER_CTX_DISABLED;
1532
1533   /* set event handler on all pads here so events work unconnected too;
1534    * in _link, it can be overruled if need be */
1535   /* FIXME: we should also do this when new pads on the element are created;
1536      but there are no hooks, so we do it again in _link */
1537   pads = gst_element_get_pad_list (element);
1538   while (pads) {
1539     GstPad *pad = GST_PAD (pads->data);
1540     pads = g_list_next (pads);
1541
1542     if (!GST_IS_REAL_PAD (pad)) continue;
1543     GST_RPAD_EVENTHANDLER (pad) = GST_RPAD_EVENTFUNC (pad);
1544   }
1545
1546   /* loop based elements *always* end up in their own group. It can eventually
1547    * be merged with another group when a link is made */
1548   if (element->loopfunc) {
1549     GstOptSchedulerGroup *group;
1550     GstOptSchedulerChain *chain;
1551
1552     chain = create_chain (osched);
1553
1554     group = create_group (chain, element);
1555     group->entry = element;
1556     group->type = GST_OPT_SCHEDULER_GROUP_LOOP;
1557
1558     GST_INFO ( "added element \"%s\" as loop based entry", GST_ELEMENT_NAME (element));
1559   }
1560 }
1561
1562 static void
1563 gst_opt_scheduler_remove_element (GstScheduler *sched, GstElement *element)
1564 {
1565   GstOptSchedulerGroup *group;
1566
1567   GST_INFO ( "removing element \"%s\" from scheduler", GST_ELEMENT_NAME (element));
1568
1569   /* decoupled elements are not added to the scheduler lists and should therefore
1570    * no be removed */
1571   if (GST_ELEMENT_IS_DECOUPLED (element))
1572     return;
1573
1574   /* the element is guaranteed to live in it's own group/chain now */
1575   get_group (element, &group);
1576   if (group) {
1577     remove_from_group (group, element);
1578   }
1579
1580   g_free (GST_ELEMENT_SCHED_CONTEXT (element));
1581   GST_ELEMENT_SCHED_CONTEXT (element) = NULL;
1582 }
1583
1584 static void
1585 gst_opt_scheduler_lock_element (GstScheduler *sched, GstElement *element)
1586 {
1587   //GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1588   g_warning ("lock element, implement me");
1589 }
1590
1591 static void
1592 gst_opt_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
1593 {
1594   //GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1595   g_warning ("unlock element, implement me");
1596 }
1597
1598 static gboolean
1599 gst_opt_scheduler_yield (GstScheduler *sched, GstElement *element)
1600 {
1601 #ifdef USE_COTHREADS
1602   /* yield hands control to the main cothread context if the requesting 
1603    * element is the entry point of the group */
1604   GstOptSchedulerGroup *group;
1605   get_group (element, &group);
1606   if (group && group->entry == element)
1607     do_cothread_switch (do_cothread_get_main (((GstOptScheduler*)sched)->context)); 
1608
1609   return FALSE;
1610 #else
1611   g_warning ("element %s performs a yield, please fix the element", 
1612                   GST_ELEMENT_NAME (element));
1613   return TRUE;
1614 #endif
1615 }
1616
1617 static gboolean
1618 gst_opt_scheduler_interrupt (GstScheduler *sched, GstElement *element)
1619 {
1620   GST_INFO ( "interrupt from \"%s\"", 
1621             GST_ELEMENT_NAME (element));
1622
1623 #ifdef USE_COTHREADS
1624   do_cothread_switch (do_cothread_get_main (((GstOptScheduler*)sched)->context)); 
1625   return FALSE;
1626 #else
1627   {
1628     GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1629  
1630     GST_INFO ( "scheduler set interrupted state");
1631     osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
1632   }
1633   return TRUE;
1634 #endif
1635 }
1636
1637 static void
1638 gst_opt_scheduler_error (GstScheduler *sched, GstElement *element)
1639 {
1640   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1641   GstOptSchedulerGroup *group;
1642   get_group (element, &group);
1643   if (group)
1644     group_error_handler (group);
1645
1646   osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1647 }
1648
1649 /* link pads, merge groups and chains */
1650 static void
1651 gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
1652 {
1653   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1654   LinkType type = GST_OPT_INVALID;
1655   GstElement *element1, *element2;
1656
1657   GST_INFO ( "pad link between \"%s:%s\" and \"%s:%s\"", 
1658                   GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1659
1660   element1 = GST_PAD_PARENT (srcpad);
1661   element2 = GST_PAD_PARENT (sinkpad);
1662
1663   /* first we need to figure out what type of link we're dealing
1664    * with */
1665   if (element1->loopfunc && element2->loopfunc)
1666     type = GST_OPT_LOOP_TO_LOOP;
1667   else {
1668     if (element1->loopfunc) {
1669       if (GST_RPAD_CHAINFUNC (sinkpad))
1670         type = GST_OPT_LOOP_TO_CHAIN;
1671     }
1672     else if (element2->loopfunc) {
1673       if (GST_RPAD_GETFUNC (srcpad)) {
1674         type = GST_OPT_GET_TO_LOOP;
1675         /* this could be tricky, the get based source could 
1676          * already be part of a loop based group in another pad,
1677          * we assert on that for now */
1678         if (GST_ELEMENT_SCHED_CONTEXT (element1) != NULL &&
1679             GST_ELEMENT_SCHED_GROUP (element1) != NULL) 
1680         {
1681           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (element1);
1682
1683           /* if the loop based element is the entry point we're ok, if it
1684            * isn't then we have multiple loop based elements in this group */
1685           if (group->entry != element2) {
1686             g_error ("internal error: cannot schedule get to loop in multi-loop based group");
1687             return;
1688           }
1689         }
1690       }
1691       else
1692         type = GST_OPT_CHAIN_TO_LOOP;
1693     }
1694     else {
1695       if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad)) {
1696         type = GST_OPT_GET_TO_CHAIN;
1697         /* the get based source could already be part of a loop 
1698          * based group in another pad, we assert on that for now */
1699         if (GST_ELEMENT_SCHED_CONTEXT (element1) != NULL &&
1700             GST_ELEMENT_SCHED_GROUP (element1) != NULL) 
1701         {
1702           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (element1);
1703
1704           /* if the get based element is the entry point we're ok, if it
1705            * isn't then we have a mixed loop/chain based group */
1706           if (group->entry != element1) {
1707             g_error ("internal error: cannot schedule get to chain with mixed loop/chain based group");
1708             return;
1709           }
1710         }
1711       }
1712       else 
1713         type = GST_OPT_CHAIN_TO_CHAIN;
1714     }
1715   }
1716  
1717   /* since we can't set event handlers on pad creation after addition, it is
1718    * best we set all of them again to the default before linking */
1719   GST_RPAD_EVENTHANDLER (srcpad) = GST_RPAD_EVENTFUNC (srcpad);
1720   GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad);
1721   
1722   /* for each link type, perform specific actions */
1723   switch (type) {
1724     case GST_OPT_GET_TO_CHAIN:
1725     {
1726       GstOptSchedulerGroup *group = NULL;
1727
1728       GST_INFO ( "get to chain based link");
1729
1730       /* setup get/chain handlers */
1731       GST_RPAD_GETHANDLER (srcpad) = GST_RPAD_GETFUNC (srcpad);
1732       if (GST_ELEMENT_IS_EVENT_AWARE (element2))
1733         GST_RPAD_CHAINHANDLER (sinkpad) = GST_RPAD_CHAINFUNC (sinkpad);
1734       else
1735         GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
1736
1737       /* the two elements should be put into the same group, 
1738        * this also means that they are in the same chain automatically */
1739       group = group_elements (osched, element1, element2);
1740
1741       /* if there is not yet an entry in the group, select the source
1742        * element as the entry point */
1743       if (!group->entry) {
1744         group->entry = element1;
1745         group->type = GST_OPT_SCHEDULER_GROUP_GET;
1746
1747         GST_INFO ( "setting \"%s\" as entry point of _get-based group %p", 
1748                   GST_ELEMENT_NAME (element1), group);
1749       }
1750       break;
1751     }
1752     case GST_OPT_LOOP_TO_CHAIN:
1753     case GST_OPT_CHAIN_TO_CHAIN:
1754       GST_INFO ( "loop/chain to chain based link");
1755
1756       if (GST_ELEMENT_IS_EVENT_AWARE (element2))
1757         GST_RPAD_CHAINHANDLER (sinkpad) = GST_RPAD_CHAINFUNC (sinkpad);
1758       else
1759         GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
1760
1761       /* the two elements should be put into the same group, 
1762        * this also means that they are in the same chain automatically, 
1763        * in case of a loop-based element1, there will be a group for element1 and
1764        * element2 will be added to it. */
1765       group_elements (osched, element1, element2);
1766       break;
1767     case GST_OPT_GET_TO_LOOP:
1768       GST_INFO ( "get to loop based link");
1769
1770       GST_RPAD_GETHANDLER (srcpad) = GST_RPAD_GETFUNC (srcpad);
1771
1772       /* the two elements should be put into the same group, 
1773        * this also means that they are in the same chain automatically, 
1774        * element2 is loop-based so it already has a group where element1
1775        * will be added to */
1776       group_elements (osched, element1, element2);
1777       break;
1778     case GST_OPT_CHAIN_TO_LOOP:
1779     case GST_OPT_LOOP_TO_LOOP:
1780     {
1781       GstOptSchedulerGroup *group1, *group2;
1782
1783       GST_INFO ( "chain/loop to loop based link");
1784
1785       GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper;
1786       GST_RPAD_GETHANDLER (srcpad) = gst_opt_scheduler_get_wrapper;
1787       /* events on the srcpad have to be intercepted as we might need to
1788        * flush the buffer lists, so override the given eventfunc */
1789       GST_RPAD_EVENTHANDLER (srcpad) = gst_opt_scheduler_event_wrapper;
1790
1791       group1 = GST_ELEMENT_SCHED_GROUP (element1);
1792       group2 = GST_ELEMENT_SCHED_GROUP (element2);
1793
1794       g_assert (group2 != NULL);
1795
1796       /* group2 is guaranteed to exist as it contains a loop-based element.
1797        * group1 only exists if element1 is linked to some other element */
1798       if (!group1) {
1799         /* create a new group for element1 as it cannot be merged into another group
1800          * here. we create the group in the same chain as the loop-based element. */
1801         GST_INFO ( "creating new group for element %s", GST_ELEMENT_NAME (element1));
1802         group1 = create_group (group2->chain, element1);
1803       }
1804       else {
1805         /* both elements are already in a group, make sure they are added to
1806          * the same chain */
1807         merge_chains (group1->chain, group2->chain);
1808       }
1809       group_inc_link (group1, group2);
1810       break;
1811     }
1812     case GST_OPT_INVALID:
1813       g_error ("(internal error) invalid element link, what are you doing?");
1814       break;
1815   }
1816 }
1817
1818 /* 
1819  * checks if an element is still linked to some other element in the group. 
1820  * no checking is done on the brokenpad arg 
1821  */
1822 static gboolean
1823 element_has_link_with_group (GstElement *element, GstOptSchedulerGroup *group, GstPad *brokenpad)
1824 {
1825   gboolean linked = FALSE;
1826   const GList *pads;
1827
1828   /* see if the element has no more links to the peer group */
1829   pads = gst_element_get_pad_list (element);
1830   while (pads && !linked) {
1831     GstPad *pad = GST_PAD (pads->data);
1832     pads = g_list_next (pads);
1833
1834     /* we only operate on real pads and on the pad that is not broken */
1835     if (!GST_IS_REAL_PAD (pad) || pad == brokenpad)
1836       continue;
1837
1838     if (GST_PAD_PEER (pad)) {
1839       GstElement *parent;
1840       GstOptSchedulerGroup *parentgroup;
1841
1842       /* see in what group this element is */
1843       parent = GST_PAD_PARENT (GST_PAD_PEER (pad));
1844
1845       /* links with decoupled elements are valid */
1846       if (GST_ELEMENT_IS_DECOUPLED (parent)) {
1847         linked = TRUE;
1848       }
1849       else {
1850         /* for non-decoupled elements we need to check the group */
1851         get_group (parent, &parentgroup);
1852
1853         /* if it's in the same group, we're still linked */
1854         if (parentgroup == group)
1855           linked = TRUE;
1856       }
1857     } 
1858   }
1859   return linked;
1860 }
1861
1862 /* 
1863  * checks if a target group is still reachable from the group without taking the broken
1864  * group link into account.
1865  */
1866 static gboolean
1867 group_can_reach_group (GstOptSchedulerGroup *group, GstOptSchedulerGroup *target)
1868 {
1869   gboolean reachable = FALSE;
1870   const GSList *links = group->group_links;
1871
1872   GST_INFO ( "checking if group %p can reach %p", 
1873                   group, target);
1874
1875   /* seems like we found the target element */
1876   if (group == target) {
1877     GST_INFO ( "found way to reach %p", target);
1878     return TRUE;
1879   }
1880
1881   /* if the group is marked as visited, we don't need to check here */
1882   if (GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET (group, GST_OPT_SCHEDULER_GROUP_VISITED)) {
1883     GST_INFO ( "already visited %p", group);
1884     return FALSE;
1885   }
1886
1887   /* mark group as visited */
1888   GST_OPT_SCHEDULER_GROUP_SET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
1889
1890   while (links && !reachable) {
1891     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
1892     GstOptSchedulerGroup *other;
1893
1894     links = g_slist_next (links);
1895
1896     /* find other group in this link */
1897     other = OTHER_GROUP_LINK (link, group);
1898
1899     GST_INFO ( "found link from %p to %p, count %d", 
1900                     group, other, link->count);
1901
1902     /* check if we can reach the target recursiveley */
1903     reachable = group_can_reach_group (other, target);
1904   }
1905   /* unset the visited flag, note that this is not optimal as we might be checking
1906    * groups several times when they are reachable with a loop. An alternative would be
1907    * to not clear the group flag at this stage but clear all flags in the chain when
1908    * all groups are checked. */
1909   GST_OPT_SCHEDULER_GROUP_UNSET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
1910
1911   GST_INFO ( "leaving group %p with %s", group, (reachable ? "TRUE":"FALSE"));
1912
1913   return reachable;
1914 }
1915
1916 static void
1917 gst_opt_scheduler_pad_unlink (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
1918 {
1919   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1920   GstElement *element1, *element2;
1921   GstOptSchedulerGroup *group1, *group2;
1922
1923   GST_INFO ( "pad unlink between \"%s:%s\" and \"%s:%s\"", 
1924                   GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1925
1926   element1 = GST_PAD_PARENT (srcpad);
1927   element2 = GST_PAD_PARENT (sinkpad);
1928   
1929   get_group (element1, &group1);
1930   get_group (element2, &group2);
1931
1932   /* for decoupled elements (that are never put into a group) we use the
1933    * group of the peer element for the remainder of the algorithm */
1934   if (GST_ELEMENT_IS_DECOUPLED (element1)) {
1935     group1 = group2;
1936   }
1937   if (GST_ELEMENT_IS_DECOUPLED (element2)) {
1938     group2 = group1;
1939   }
1940
1941   /* if one the elements has no group (anymore) we don't really care 
1942    * about the link */
1943   if (!group1 || !group2) {
1944     GST_INFO ( "one (or both) of the elements is not in a group, not interesting");
1945     return;
1946   }
1947
1948   /* easy part, groups are different */
1949   if (group1 != group2) {
1950     gboolean zero;
1951
1952     GST_INFO ( "elements are in different groups");
1953
1954     /* we can remove the links between the groups now */
1955     zero = group_dec_link (group1, group2);
1956
1957     /* if the groups are not directly connected anymore, we have to perform a recursive check
1958      * to see if they are really unlinked */
1959     if (zero) {
1960       gboolean still_link;
1961       GstOptSchedulerChain *chain;
1962
1963       /* see if group1 and group2 are still connected in any indirect way */
1964       still_link = group_can_reach_group (group1, group2);
1965
1966       GST_INFO ( "group %p %s reach group %p", group1, (still_link ? "can":"can't"), group2);
1967       if (!still_link) {
1968         /* groups are really disconnected, migrate one group to a new chain */
1969         chain = create_chain (osched);
1970         chain_recursively_migrate_group (chain, group1);
1971
1972         GST_INFO ( "migrated group %p to new chain %p", group1, chain);
1973       }
1974     }
1975     else {
1976       GST_INFO ( "group %p still has direct link with group %p", group1, group2);
1977     }
1978   }
1979   /* hard part, groups are equal */
1980   else {
1981     gboolean still_link1, still_link2;
1982     GstOptSchedulerGroup *group;
1983     
1984     /* since group1 == group2, it doesn't matter which group we take */
1985     group = group1;
1986
1987     GST_INFO ( "elements are in the same group %p", group);
1988
1989     /* check if the element is still linked to some other element in the group,
1990      * we pass the pad that is broken up as an arg because a link on that pad
1991      * is not valid anymore.
1992      * Note that this check is only to make sure that a single element can be removed 
1993      * completely from the group, we also have to check for migrating several 
1994      * elements to a new group. */
1995     still_link1 = element_has_link_with_group (element1, group, srcpad);
1996     still_link2 = element_has_link_with_group (element2, group, sinkpad);
1997
1998     /* if there is still a link, we don't need to break this group */
1999     if (still_link1 && still_link2) {
2000       GST_INFO ( "elements still have links with other elements in the group");
2001       /* FIXME it's possible that we have to break the group/chain. This heppens when
2002        * the src element recursiveley has links with other elements in the group but not 
2003        * with all elements. */
2004       g_warning ("opt: unlink elements in same group: implement me");
2005       return;
2006     }
2007
2008     /* now check which one of the elements we can remove from the group */
2009     if (!still_link1) {
2010       /* we only remove elements that are not the entry point of a loop based
2011        * group and are not decoupled */
2012       if (!(group->entry == element1 &&
2013            group->type == GST_OPT_SCHEDULER_GROUP_LOOP) &&
2014           !GST_ELEMENT_IS_DECOUPLED (element1)) 
2015       {
2016         GST_INFO ( "element1 is separated from the group");
2017         remove_from_group (group, element1);
2018       }
2019       else {
2020         GST_INFO ( "element1 is decoupled or entry in loop based group");
2021       }
2022     }
2023     if (!still_link2) {
2024       /* we only remove elements that are not the entry point of a loop based
2025        * group and are not decoupled */
2026       if (!(group->entry == element2 &&
2027            group->type == GST_OPT_SCHEDULER_GROUP_LOOP) &&
2028           !GST_ELEMENT_IS_DECOUPLED (element2)) 
2029       {
2030         GST_INFO ( "element2 is separated from the group");
2031         remove_from_group (group, element2);
2032       }
2033       else {
2034         GST_INFO ( "element2 is decoupled or entry in loop based group");
2035       }
2036     }
2037   }
2038 }
2039
2040 static void
2041 gst_opt_scheduler_pad_select (GstScheduler *sched, GList *padlist)
2042 {
2043   //GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2044   
2045   g_warning ("pad select, implement me");
2046 }
2047
2048 static GstClockReturn
2049 gst_opt_scheduler_clock_wait (GstScheduler *sched, GstElement *element,
2050                               GstClockID id, GstClockTimeDiff *jitter)
2051 {
2052   return gst_clock_id_wait (id, jitter);
2053 }
2054
2055 /* a scheduler iteration is done by looping and scheduling the active chains */
2056 static GstSchedulerState
2057 gst_opt_scheduler_iterate (GstScheduler *sched)
2058 {
2059   GstSchedulerState state = GST_SCHEDULER_STATE_STOPPED;
2060   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2061   gint iterations = osched->iterations;
2062
2063   osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2064
2065   while (iterations) {
2066     gboolean scheduled = FALSE;
2067     GSList *chains;
2068
2069     /* we have to schedule each of the scheduler chains now */
2070     chains = osched->chains;
2071     while (chains) {
2072       GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2073
2074       ref_chain (chain);
2075       /* if the chain is not disabled, schedule it */
2076       if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
2077         schedule_chain (chain);
2078         scheduled = TRUE;
2079       }
2080
2081       /* don't schedule any more chains when in error */
2082       if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2083         GST_ERROR_OBJECT (sched, "in error state");
2084         break;
2085       } 
2086       else if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
2087         GST_DEBUG_OBJECT (osched, "got interrupted, continue with next chain");
2088         osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2089       }
2090
2091       GST_LOG_OBJECT (sched, "iterate scheduled %p", chain);
2092
2093       chains = g_slist_next (chains);
2094       unref_chain (chain);
2095     }
2096
2097     /* at this point it's possible that the scheduler state is
2098      * in error, we then return an error */
2099     if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2100       state = GST_SCHEDULER_STATE_ERROR;
2101       break;
2102     }
2103     else {
2104       /* if chains were scheduled, return our current state */
2105       if (scheduled)
2106         state = GST_SCHEDULER_STATE (sched);
2107       /* if no chains were scheduled, we say we are stopped */
2108       else {
2109         state = GST_SCHEDULER_STATE_STOPPED;
2110         break;
2111       }
2112     }
2113     if (iterations > 0)
2114       iterations--;
2115   }
2116
2117   return state;
2118 }
2119
2120
2121 static void
2122 gst_opt_scheduler_show (GstScheduler *sched)
2123 {
2124   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2125   GSList *chains;
2126
2127   g_print ("iterations:    %d\n", osched->iterations);
2128   g_print ("max recursion: %d\n", osched->max_recursion);
2129
2130   chains = osched->chains;
2131   while (chains) {
2132     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2133     GSList *groups = chain->groups;
2134     chains = g_slist_next (chains);
2135
2136     g_print ("+- chain %p: refcount %d, %d groups, %d enabled, flags %d\n", 
2137                     chain, chain->refcount, chain->num_groups, chain->num_enabled, chain->flags);
2138
2139     while (groups) {
2140       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
2141       GSList *elements = group->elements;
2142       groups = g_slist_next (groups);
2143
2144       g_print (" +- group %p: refcount %d, %d elements, %d enabled, flags %d, entry %s, %s\n", 
2145                       group, group->refcount, group->num_elements, group->num_enabled, group->flags,
2146                       (group->entry ? GST_ELEMENT_NAME (group->entry): "(none)"),
2147                       (group->type == GST_OPT_SCHEDULER_GROUP_GET ? "get-based" : "loop-based") );
2148
2149       while (elements) {
2150         GstElement *element = (GstElement *) elements->data;
2151         elements = g_slist_next (elements);
2152
2153         g_print ("  +- element %s\n", GST_ELEMENT_NAME (element));
2154       }
2155     }
2156   }
2157 }
2158
2159 static void
2160 gst_opt_scheduler_get_property (GObject *object, guint prop_id,
2161                                 GValue *value, GParamSpec *pspec)
2162 {
2163   GstOptScheduler *osched;
2164   
2165   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2166
2167   osched = GST_OPT_SCHEDULER (object);
2168
2169   switch (prop_id) {
2170     case ARG_ITERATIONS:
2171       g_value_set_int (value, osched->iterations);
2172       break;
2173     case ARG_MAX_RECURSION:
2174       g_value_set_int (value, osched->max_recursion);
2175       break;
2176     default:
2177       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2178       break;
2179   }
2180 }
2181
2182 static void
2183 gst_opt_scheduler_set_property (GObject *object, guint prop_id,
2184                                 const GValue *value, GParamSpec *pspec)
2185 {
2186   GstOptScheduler *osched;
2187   
2188   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2189
2190   osched = GST_OPT_SCHEDULER (object);
2191
2192   switch (prop_id) {
2193     case ARG_ITERATIONS:
2194       osched->iterations = g_value_get_int (value);
2195       break;
2196     case ARG_MAX_RECURSION:
2197       osched->max_recursion = g_value_get_int (value);
2198       break;
2199     default:
2200       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2201       break;
2202   }
2203 }