First THREADED backport attempt, focusing on adding locks and making sure the API...
[platform/upstream/gstreamer.git] / gst / schedulers / gstoptimalscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstoptimalscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #include <gst/gst.h>
28
29 GST_DEBUG_CATEGORY_STATIC (debug_scheduler);
30 #define GST_CAT_DEFAULT debug_scheduler
31
32 #ifdef USE_COTHREADS
33 # include "cothreads_compat.h"
34 #else
35 # define COTHREADS_NAME_CAPITAL ""
36 # define COTHREADS_NAME         ""
37 #endif
38
39 #define GST_ELEMENT_SCHED_CONTEXT(elem)         ((GstOptSchedulerCtx*) (GST_ELEMENT (elem)->sched_private))
40 #define GST_ELEMENT_SCHED_GROUP(elem)           (GST_ELEMENT_SCHED_CONTEXT (elem)->group)
41 /* need this first macro to not run into lvalue casts */
42 #define GST_PAD_DATAPEN(pad)                    (GST_REAL_PAD(pad)->sched_private)
43 #define GST_PAD_DATALIST(pad)                   ((GList*) GST_PAD_DATAPEN(pad))
44
45 #define GST_ELEMENT_COTHREAD_STOPPING                   GST_ELEMENT_SCHEDULER_PRIVATE1
46 #define GST_ELEMENT_IS_COTHREAD_STOPPING(element)       GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
47 #define GST_ELEMENT_VISITED                             GST_ELEMENT_SCHEDULER_PRIVATE2
48 #define GST_ELEMENT_IS_VISITED(element)                 GST_FLAG_IS_SET((element), GST_ELEMENT_VISITED)
49 #define GST_ELEMENT_SET_VISITED(element)                GST_FLAG_SET((element), GST_ELEMENT_VISITED)
50 #define GST_ELEMENT_UNSET_VISITED(element)              GST_FLAG_UNSET((element), GST_ELEMENT_VISITED)
51
52 typedef struct _GstOptScheduler GstOptScheduler;
53 typedef struct _GstOptSchedulerClass GstOptSchedulerClass;
54
55 #define GST_TYPE_OPT_SCHEDULER \
56   (gst_opt_scheduler_get_type())
57 #define GST_OPT_SCHEDULER(obj) \
58   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_OPT_SCHEDULER,GstOptScheduler))
59 #define GST_OPT_SCHEDULER_CLASS(klass) \
60   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_OPT_SCHEDULER,GstOptSchedulerClass))
61 #define GST_IS_OPT_SCHEDULER(obj) \
62   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OPT_SCHEDULER))
63 #define GST_IS_OPT_SCHEDULER_CLASS(obj) \
64   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OPT_SCHEDULER))
65
66 typedef enum
67 {
68   GST_OPT_SCHEDULER_STATE_NONE,
69   GST_OPT_SCHEDULER_STATE_STOPPED,
70   GST_OPT_SCHEDULER_STATE_ERROR,
71   GST_OPT_SCHEDULER_STATE_RUNNING,
72   GST_OPT_SCHEDULER_STATE_INTERRUPTED
73 }
74 GstOptSchedulerState;
75
76 #define GST_OPT_LOCK(sched)     (g_static_rec_mutex_lock (&((GstOptScheduler *)sched)->lock))
77 #define GST_OPT_UNLOCK(sched)   (g_static_rec_mutex_unlock (&((GstOptScheduler *)sched)->lock))
78
79 struct _GstOptScheduler
80 {
81   GstScheduler parent;
82
83   GStaticRecMutex lock;
84
85   GstOptSchedulerState state;
86
87 #ifdef USE_COTHREADS
88   cothread_context *context;
89 #endif
90   gint iterations;
91
92   GSList *elements;
93   GSList *chains;
94
95   GList *runqueue;
96   gint recursion;
97
98   gint max_recursion;
99   gint live_groups;
100   gint live_chains;
101   gint live_links;
102 };
103
104 struct _GstOptSchedulerClass
105 {
106   GstSchedulerClass parent_class;
107 };
108
109 static GType _gst_opt_scheduler_type = 0;
110
111 typedef enum
112 {
113   GST_OPT_SCHEDULER_CHAIN_DIRTY = (1 << 1),
114   GST_OPT_SCHEDULER_CHAIN_DISABLED = (1 << 2),
115   GST_OPT_SCHEDULER_CHAIN_RUNNING = (1 << 3)
116 }
117 GstOptSchedulerChainFlags;
118
119 #define GST_OPT_SCHEDULER_CHAIN_SET_DIRTY(chain)        ((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DIRTY)
120 #define GST_OPT_SCHEDULER_CHAIN_SET_CLEAN(chain)        ((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DIRTY)
121 #define GST_OPT_SCHEDULER_CHAIN_IS_DIRTY(chain)         ((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DIRTY)
122
123 #define GST_OPT_SCHEDULER_CHAIN_DISABLE(chain)          ((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DISABLED)
124 #define GST_OPT_SCHEDULER_CHAIN_ENABLE(chain)           ((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DISABLED)
125 #define GST_OPT_SCHEDULER_CHAIN_IS_DISABLED(chain)      ((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DISABLED)
126
127 typedef struct _GstOptSchedulerChain GstOptSchedulerChain;
128
129 struct _GstOptSchedulerChain
130 {
131   gint refcount;
132
133   GstOptScheduler *sched;
134
135   GstOptSchedulerChainFlags flags;
136
137   GSList *groups;               /* the groups in this chain */
138   gint num_groups;
139   gint num_enabled;
140 };
141
142 /* 
143  * elements that are scheduled in one cothread 
144  */
145 typedef enum
146 {
147   GST_OPT_SCHEDULER_GROUP_DIRTY = (1 << 1),     /* this group has been modified */
148   GST_OPT_SCHEDULER_GROUP_COTHREAD_STOPPING = (1 << 2), /* the group's cothread stops after one iteration */
149   GST_OPT_SCHEDULER_GROUP_DISABLED = (1 << 3),  /* this group is disabled */
150   GST_OPT_SCHEDULER_GROUP_RUNNING = (1 << 4),   /* this group is running */
151   GST_OPT_SCHEDULER_GROUP_SCHEDULABLE = (1 << 5),       /* this group is schedulable */
152   GST_OPT_SCHEDULER_GROUP_VISITED = (1 << 6)    /* this group is visited when finding links */
153 }
154 GstOptSchedulerGroupFlags;
155
156 typedef enum
157 {
158   GST_OPT_SCHEDULER_GROUP_UNKNOWN = 3,
159   GST_OPT_SCHEDULER_GROUP_GET = 1,
160   GST_OPT_SCHEDULER_GROUP_LOOP = 2
161 }
162 GstOptSchedulerGroupType;
163
164 #define GST_OPT_SCHEDULER_GROUP_SET_FLAG(group,flag)    ((group)->flags |= (flag))
165 #define GST_OPT_SCHEDULER_GROUP_UNSET_FLAG(group,flag)  ((group)->flags &= ~(flag))
166 #define GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET(group,flag) ((group)->flags & (flag))
167
168 #define GST_OPT_SCHEDULER_GROUP_DISABLE(group)          ((group)->flags |= GST_OPT_SCHEDULER_GROUP_DISABLED)
169 #define GST_OPT_SCHEDULER_GROUP_ENABLE(group)           ((group)->flags &= ~GST_OPT_SCHEDULER_GROUP_DISABLED)
170 #define GST_OPT_SCHEDULER_GROUP_IS_ENABLED(group)       (!((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED))
171 #define GST_OPT_SCHEDULER_GROUP_IS_DISABLED(group)      ((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED)
172
173
174 typedef struct _GstOptSchedulerGroup GstOptSchedulerGroup;
175 typedef struct _GstOptSchedulerGroupLink GstOptSchedulerGroupLink;
176
177 /* used to keep track of links with other groups */
178 struct _GstOptSchedulerGroupLink
179 {
180   GstOptSchedulerGroup *src;    /* the group we are linked with */
181   GstOptSchedulerGroup *sink;   /* the group we are linked with */
182   gint count;                   /* the number of links with the group */
183 };
184
185 #define IS_GROUP_LINK(link, srcg, sinkg)        ((link->src == srcg && link->sink == sinkg) || \
186                                                  (link->sink == srcg && link->src == sinkg))
187 #define OTHER_GROUP_LINK(link, group)           (link->src == group ? link->sink : link->src)
188
189 typedef int (*GroupScheduleFunction) (int argc, char *argv[]);
190
191 struct _GstOptSchedulerGroup
192 {
193   GstOptSchedulerChain *chain;  /* the chain this group belongs to */
194   GstOptSchedulerGroupFlags flags;      /* flags for this group */
195   GstOptSchedulerGroupType type;        /* flags for this group */
196   GstOptScheduler *sched;       /* the scheduler */
197
198   gint refcount;
199
200   GSList *elements;             /* elements of this group */
201   gint num_elements;
202   gint num_enabled;
203   GstElement *entry;            /* the group's entry point */
204
205   GSList *group_links;          /* other groups that are linked with this group */
206
207 #ifdef USE_COTHREADS
208   cothread *cothread;           /* the cothread of this group */
209 #else
210   GroupScheduleFunction schedulefunc;
211 #endif
212   int argc;
213   char **argv;
214 };
215
216 /* 
217  * A group is a set of elements through which data can flow without switching
218  * cothreads or without invoking the scheduler's run queue.
219  */
220 static GstOptSchedulerGroup *ref_group (GstOptSchedulerGroup * group);
221 static GstOptSchedulerGroup *unref_group (GstOptSchedulerGroup * group);
222 static GstOptSchedulerGroup *create_group (GstOptSchedulerChain * chain,
223     GstElement * element, GstOptSchedulerGroupType type);
224 static void destroy_group (GstOptSchedulerGroup * group);
225 static GstOptSchedulerGroup *add_to_group (GstOptSchedulerGroup * group,
226     GstElement * element, gboolean with_links);
227 static GstOptSchedulerGroup *remove_from_group (GstOptSchedulerGroup * group,
228     GstElement * element);
229 static void group_dec_links_for_element (GstOptSchedulerGroup * group,
230     GstElement * element);
231 static void group_inc_links_for_element (GstOptSchedulerGroup * group,
232     GstElement * element);
233 static GstOptSchedulerGroup *merge_groups (GstOptSchedulerGroup * group1,
234     GstOptSchedulerGroup * group2);
235 static void setup_group_scheduler (GstOptScheduler * osched,
236     GstOptSchedulerGroup * group);
237 static void destroy_group_scheduler (GstOptSchedulerGroup * group);
238 static void group_error_handler (GstOptSchedulerGroup * group);
239 static void group_element_set_enabled (GstOptSchedulerGroup * group,
240     GstElement * element, gboolean enabled);
241 static gboolean schedule_group (GstOptSchedulerGroup * group);
242 static void get_group (GstElement * element, GstOptSchedulerGroup ** group);
243
244
245 /* 
246  * A chain is a set of groups that are linked to each other.
247  */
248 static void destroy_chain (GstOptSchedulerChain * chain);
249 static GstOptSchedulerChain *create_chain (GstOptScheduler * osched);
250 static GstOptSchedulerChain *ref_chain (GstOptSchedulerChain * chain);
251 static GstOptSchedulerChain *unref_chain (GstOptSchedulerChain * chain);
252 static GstOptSchedulerChain *add_to_chain (GstOptSchedulerChain * chain,
253     GstOptSchedulerGroup * group);
254 static GstOptSchedulerChain *remove_from_chain (GstOptSchedulerChain * chain,
255     GstOptSchedulerGroup * group);
256 static GstOptSchedulerChain *merge_chains (GstOptSchedulerChain * chain1,
257     GstOptSchedulerChain * chain2);
258 static void chain_recursively_migrate_group (GstOptSchedulerChain * chain,
259     GstOptSchedulerGroup * group);
260 static void chain_group_set_enabled (GstOptSchedulerChain * chain,
261     GstOptSchedulerGroup * group, gboolean enabled);
262 static gboolean schedule_chain (GstOptSchedulerChain * chain);
263
264
265 /*
266  * The schedule functions are the entry points for cothreads, or called directly
267  * by gst_opt_scheduler_schedule_run_queue
268  */
269 static int get_group_schedule_function (int argc, char *argv[]);
270 static int loop_group_schedule_function (int argc, char *argv[]);
271 static int unknown_group_schedule_function (int argc, char *argv[]);
272
273
274 /*
275  * These wrappers are set on the pads as the chain handler (what happens when
276  * gst_pad_push is called) or get handler (for gst_pad_pull).
277  */
278 static void gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data);
279 static GstData *gst_opt_scheduler_get_wrapper (GstPad * srcpad);
280
281
282 /*
283  * Without cothreads, gst_pad_push or gst_pad_pull on a loop-based group will
284  * just queue the peer element on a list. We need to actually run the queue
285  * instead of relying on cothreads to do the switch for us.
286  */
287 #ifndef USE_COTHREADS
288 static void gst_opt_scheduler_schedule_run_queue (GstOptScheduler * osched,
289     GstOptSchedulerGroup * only_group);
290 #endif
291
292
293 /* 
294  * Scheduler private data for an element 
295  */
296 typedef struct _GstOptSchedulerCtx GstOptSchedulerCtx;
297
298 typedef enum
299 {
300   GST_OPT_SCHEDULER_CTX_DISABLED = (1 << 1)     /* the element is disabled */
301 }
302 GstOptSchedulerCtxFlags;
303
304 struct _GstOptSchedulerCtx
305 {
306   GstOptSchedulerGroup *group;  /* the group this element belongs to */
307
308   GstOptSchedulerCtxFlags flags;        /* flags for this element */
309 };
310
311
312 /*
313  * Implementation of GstScheduler
314  */
315 enum
316 {
317   ARG_0,
318   ARG_ITERATIONS,
319   ARG_MAX_RECURSION
320 };
321
322 static void gst_opt_scheduler_class_init (GstOptSchedulerClass * klass);
323 static void gst_opt_scheduler_init (GstOptScheduler * scheduler);
324
325 static void gst_opt_scheduler_set_property (GObject * object, guint prop_id,
326     const GValue * value, GParamSpec * pspec);
327 static void gst_opt_scheduler_get_property (GObject * object, guint prop_id,
328     GValue * value, GParamSpec * pspec);
329
330 static void gst_opt_scheduler_dispose (GObject * object);
331 static void gst_opt_scheduler_finalize (GObject * object);
332
333 static void gst_opt_scheduler_setup (GstScheduler * sched);
334 static void gst_opt_scheduler_reset (GstScheduler * sched);
335 static void gst_opt_scheduler_add_element (GstScheduler * sched,
336     GstElement * element);
337 static void gst_opt_scheduler_remove_element (GstScheduler * sched,
338     GstElement * element);
339 static GstElementStateReturn gst_opt_scheduler_state_transition (GstScheduler *
340     sched, GstElement * element, gint transition);
341 static void gst_opt_scheduler_scheduling_change (GstScheduler * sched,
342     GstElement * element);
343 static gboolean gst_opt_scheduler_yield (GstScheduler * sched,
344     GstElement * element);
345 static gboolean gst_opt_scheduler_interrupt (GstScheduler * sched,
346     GstElement * element);
347 static void gst_opt_scheduler_error (GstScheduler * sched,
348     GstElement * element);
349 static void gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
350     GstPad * sinkpad);
351 static void gst_opt_scheduler_pad_unlink (GstScheduler * sched, GstPad * srcpad,
352     GstPad * sinkpad);
353 static GstSchedulerState gst_opt_scheduler_iterate (GstScheduler * sched);
354
355 static void gst_opt_scheduler_show (GstScheduler * sched);
356
357 static GstSchedulerClass *parent_class = NULL;
358
359 static GType
360 gst_opt_scheduler_get_type (void)
361 {
362   if (!_gst_opt_scheduler_type) {
363     static const GTypeInfo scheduler_info = {
364       sizeof (GstOptSchedulerClass),
365       NULL,
366       NULL,
367       (GClassInitFunc) gst_opt_scheduler_class_init,
368       NULL,
369       NULL,
370       sizeof (GstOptScheduler),
371       0,
372       (GInstanceInitFunc) gst_opt_scheduler_init,
373       NULL
374     };
375
376     _gst_opt_scheduler_type = g_type_register_static (GST_TYPE_SCHEDULER,
377         "GstOpt" COTHREADS_NAME_CAPITAL "Scheduler", &scheduler_info, 0);
378   }
379   return _gst_opt_scheduler_type;
380 }
381
382 static void
383 gst_opt_scheduler_class_init (GstOptSchedulerClass * klass)
384 {
385   GObjectClass *gobject_class;
386   GstObjectClass *gstobject_class;
387   GstSchedulerClass *gstscheduler_class;
388
389   gobject_class = (GObjectClass *) klass;
390   gstobject_class = (GstObjectClass *) klass;
391   gstscheduler_class = (GstSchedulerClass *) klass;
392
393   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
394
395   gobject_class->set_property =
396       GST_DEBUG_FUNCPTR (gst_opt_scheduler_set_property);
397   gobject_class->get_property =
398       GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property);
399   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose);
400   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_opt_scheduler_finalize);
401
402   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ITERATIONS,
403       g_param_spec_int ("iterations", "Iterations",
404           "Number of groups to schedule in one iteration (-1 == until EOS/error)",
405           -1, G_MAXINT, 1, G_PARAM_READWRITE));
406 #ifndef USE_COTHREADS
407   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_RECURSION,
408       g_param_spec_int ("max_recursion", "Max recursion",
409           "Maximum number of recursions", 1, G_MAXINT, 100, G_PARAM_READWRITE));
410 #endif
411
412   gstscheduler_class->setup = GST_DEBUG_FUNCPTR (gst_opt_scheduler_setup);
413   gstscheduler_class->reset = GST_DEBUG_FUNCPTR (gst_opt_scheduler_reset);
414   gstscheduler_class->add_element =
415       GST_DEBUG_FUNCPTR (gst_opt_scheduler_add_element);
416   gstscheduler_class->remove_element =
417       GST_DEBUG_FUNCPTR (gst_opt_scheduler_remove_element);
418   gstscheduler_class->state_transition =
419       GST_DEBUG_FUNCPTR (gst_opt_scheduler_state_transition);
420   gstscheduler_class->scheduling_change =
421       GST_DEBUG_FUNCPTR (gst_opt_scheduler_scheduling_change);
422   gstscheduler_class->yield = GST_DEBUG_FUNCPTR (gst_opt_scheduler_yield);
423   gstscheduler_class->interrupt =
424       GST_DEBUG_FUNCPTR (gst_opt_scheduler_interrupt);
425   gstscheduler_class->error = GST_DEBUG_FUNCPTR (gst_opt_scheduler_error);
426   gstscheduler_class->pad_link = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_link);
427   gstscheduler_class->pad_unlink =
428       GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_unlink);
429   gstscheduler_class->clock_wait = NULL;
430   gstscheduler_class->iterate = GST_DEBUG_FUNCPTR (gst_opt_scheduler_iterate);
431   gstscheduler_class->show = GST_DEBUG_FUNCPTR (gst_opt_scheduler_show);
432
433 #ifdef USE_COTHREADS
434   do_cothreads_init (NULL);
435 #endif
436 }
437
438 static void
439 gst_opt_scheduler_init (GstOptScheduler * scheduler)
440 {
441   g_static_rec_mutex_init (&scheduler->lock);
442
443   scheduler->elements = NULL;
444   scheduler->iterations = 1;
445   scheduler->max_recursion = 100;
446   scheduler->live_groups = 0;
447   scheduler->live_chains = 0;
448   scheduler->live_links = 0;
449 }
450
451 static void
452 gst_opt_scheduler_dispose (GObject * object)
453 {
454   G_OBJECT_CLASS (parent_class)->dispose (object);
455 }
456
457 static void
458 gst_opt_scheduler_finalize (GObject * object)
459 {
460   GstOptScheduler *osched = GST_OPT_SCHEDULER (object);
461
462   g_static_rec_mutex_free (&osched->lock);
463
464   G_OBJECT_CLASS (parent_class)->finalize (object);
465 }
466
467 static gboolean
468 plugin_init (GstPlugin * plugin)
469 {
470 #ifdef USE_COTHREADS
471   if (!gst_scheduler_register (plugin, "opt" COTHREADS_NAME,
472           "An optimal scheduler using " COTHREADS_NAME " cothreads",
473           gst_opt_scheduler_get_type ()))
474 #else
475   if (!gst_scheduler_register (plugin, "opt",
476           "An optimal scheduler using no cothreads",
477           gst_opt_scheduler_get_type ()))
478 #endif
479     return FALSE;
480
481   GST_DEBUG_CATEGORY_INIT (debug_scheduler, "scheduler", 0,
482       "optimal scheduler");
483
484   return TRUE;
485 }
486
487 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
488     GST_VERSION_MINOR,
489     "gstopt" COTHREADS_NAME "scheduler",
490     "An optimal scheduler using " COTHREADS_NAME " cothreads",
491     plugin_init, VERSION, GST_LICENSE, GST_PACKAGE, GST_ORIGIN);
492
493
494 static GstOptSchedulerChain *
495 ref_chain (GstOptSchedulerChain * chain)
496 {
497   GST_LOG ("ref chain %p %d->%d", chain, chain->refcount, chain->refcount + 1);
498   chain->refcount++;
499
500   return chain;
501 }
502
503 static GstOptSchedulerChain *
504 unref_chain (GstOptSchedulerChain * chain)
505 {
506   GST_LOG ("unref chain %p %d->%d", chain,
507       chain->refcount, chain->refcount - 1);
508
509   if (--chain->refcount == 0) {
510     destroy_chain (chain);
511     chain = NULL;
512   }
513
514   return chain;
515 }
516
517 static GstOptSchedulerChain *
518 create_chain (GstOptScheduler * osched)
519 {
520   GstOptSchedulerChain *chain;
521
522   chain = g_new0 (GstOptSchedulerChain, 1);
523   chain->sched = osched;
524   chain->refcount = 1;
525   chain->flags = GST_OPT_SCHEDULER_CHAIN_DISABLED;
526   osched->live_chains++;
527
528   gst_object_ref (GST_OBJECT (osched));
529   osched->chains = g_slist_prepend (osched->chains, chain);
530
531   GST_LOG ("new chain %p, %d live chains now", chain, osched->live_chains);
532
533   return chain;
534 }
535
536 static void
537 destroy_chain (GstOptSchedulerChain * chain)
538 {
539   GstOptScheduler *osched;
540
541   GST_LOG ("destroy chain %p", chain);
542
543   g_assert (chain->num_groups == 0);
544   g_assert (chain->groups == NULL);
545
546   osched = chain->sched;
547   osched->chains = g_slist_remove (osched->chains, chain);
548   osched->live_chains--;
549
550   GST_LOG ("%d live chains now", osched->live_chains);
551
552   gst_object_unref (GST_OBJECT (osched));
553
554   g_free (chain);
555 }
556
557 static GstOptSchedulerChain *
558 add_to_chain (GstOptSchedulerChain * chain, GstOptSchedulerGroup * group)
559 {
560   gboolean enabled;
561
562   GST_LOG ("adding group %p to chain %p", group, chain);
563
564   g_assert (group->chain == NULL);
565
566   group = ref_group (group);
567
568   group->chain = ref_chain (chain);
569   chain->groups = g_slist_prepend (chain->groups, group);
570   chain->num_groups++;
571
572   enabled = GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group);
573
574   if (enabled) {
575     /* we can now setup the scheduling of the group */
576     setup_group_scheduler (chain->sched, group);
577
578     chain->num_enabled++;
579     if (chain->num_enabled == chain->num_groups) {
580       GST_LOG ("enabling chain %p after adding of enabled group", chain);
581       GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
582     }
583   }
584
585   /* queue a resort of the group list, which determines which group will be run
586    * first. */
587   GST_OPT_SCHEDULER_CHAIN_SET_DIRTY (chain);
588
589   return chain;
590 }
591
592 static GstOptSchedulerChain *
593 remove_from_chain (GstOptSchedulerChain * chain, GstOptSchedulerGroup * group)
594 {
595   gboolean enabled;
596
597   GST_LOG ("removing group %p from chain %p", group, chain);
598
599   if (!chain)
600     return NULL;
601
602   g_assert (group);
603   g_assert (group->chain == chain);
604
605   enabled = GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group);
606
607   group->chain = NULL;
608   chain->groups = g_slist_remove (chain->groups, group);
609   chain->num_groups--;
610   unref_group (group);
611
612   if (chain->num_groups == 0)
613     chain = unref_chain (chain);
614   else {
615     /* removing an enabled group from the chain decrements the 
616      * enabled counter */
617     if (enabled) {
618       chain->num_enabled--;
619       if (chain->num_enabled == 0) {
620         GST_LOG ("disabling chain %p after removal of the only enabled group",
621             chain);
622         GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
623       }
624     } else {
625       if (chain->num_enabled == chain->num_groups) {
626         GST_LOG ("enabling chain %p after removal of the only disabled group",
627             chain);
628         GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
629       }
630     }
631   }
632
633   GST_OPT_SCHEDULER_CHAIN_SET_DIRTY (chain);
634
635   chain = unref_chain (chain);
636   return chain;
637 }
638
639 static GstOptSchedulerChain *
640 merge_chains (GstOptSchedulerChain * chain1, GstOptSchedulerChain * chain2)
641 {
642   GSList *walk;
643
644   g_assert (chain1 != NULL);
645
646   GST_LOG ("merging chain %p and %p", chain1, chain2);
647
648   /* FIXME: document how chain2 can be NULL */
649   if (chain1 == chain2 || chain2 == NULL)
650     return chain1;
651
652   /* switch if it's more efficient */
653   if (chain1->num_groups < chain2->num_groups) {
654     GstOptSchedulerChain *tmp = chain2;
655
656     chain2 = chain1;
657     chain1 = tmp;
658   }
659
660   walk = chain2->groups;
661   while (walk) {
662     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
663
664     walk = g_slist_next (walk);
665
666     GST_LOG ("reparenting group %p from chain %p to %p", group, chain2, chain1);
667
668     ref_group (group);
669
670     remove_from_chain (chain2, group);
671     add_to_chain (chain1, group);
672
673     unref_group (group);
674   }
675
676   /* chain2 is now freed, if nothing else was referencing it before */
677
678   return chain1;
679 }
680
681 /* sorts the group list so that terminal sinks come first -- prevents pileup of
682  * datas in datapens */
683 static void
684 sort_chain (GstOptSchedulerChain * chain)
685 {
686   GSList *original = chain->groups;
687   GSList *new = NULL;
688   GSList *walk, *links, *this;
689
690   /* if there's only one group, just return */
691   if (!original->next)
692     return;
693   /* otherwise, we know that all groups are somehow linked together */
694
695   GST_LOG ("sorting chain %p (%d groups)", chain, g_slist_length (original));
696
697   /* first find the terminal sinks */
698   for (walk = original; walk;) {
699     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
700
701     this = walk;
702     walk = walk->next;
703     if (group->group_links) {
704       gboolean is_sink = TRUE;
705
706       for (links = group->group_links; links; links = links->next)
707         if (((GstOptSchedulerGroupLink *) links->data)->src == group)
708           is_sink = FALSE;
709       if (is_sink) {
710         /* found one */
711         original = g_slist_remove_link (original, this);
712         new = g_slist_concat (new, this);
713       }
714     }
715   }
716   g_assert (new != NULL);
717
718   /* now look for the elements that are linked to the terminal sinks */
719   for (walk = new; walk; walk = walk->next) {
720     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
721
722     for (links = group->group_links; links; links = links->next) {
723       this =
724           g_slist_find (original,
725           ((GstOptSchedulerGroupLink *) links->data)->src);
726       if (this) {
727         original = g_slist_remove_link (original, this);
728         new = g_slist_concat (new, this);
729       }
730     }
731   }
732   g_assert (original == NULL);
733
734   chain->groups = new;
735 }
736
737 static void
738 chain_group_set_enabled (GstOptSchedulerChain * chain,
739     GstOptSchedulerGroup * group, gboolean enabled)
740 {
741   gboolean oldstate;
742
743   g_assert (group != NULL);
744   g_assert (chain != NULL);
745
746   GST_LOG
747       ("request to %d group %p in chain %p, have %d groups enabled out of %d",
748       enabled, group, chain, chain->num_enabled, chain->num_groups);
749
750   oldstate = (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group) ? TRUE : FALSE);
751   if (oldstate == enabled) {
752     GST_LOG ("group %p in chain %p was in correct state", group, chain);
753     return;
754   }
755
756   if (enabled)
757     GST_OPT_SCHEDULER_GROUP_ENABLE (group);
758   else
759     GST_OPT_SCHEDULER_GROUP_DISABLE (group);
760
761   if (enabled) {
762     g_assert (chain->num_enabled < chain->num_groups);
763
764     chain->num_enabled++;
765
766     GST_DEBUG ("enable group %p in chain %p, now %d groups enabled out of %d",
767         group, chain, chain->num_enabled, chain->num_groups);
768
769     /* OK to call even if the scheduler (cothread context / schedulerfunc) was
770        setup already -- will get destroyed when the group is destroyed */
771     setup_group_scheduler (chain->sched, group);
772
773     if (chain->num_enabled == chain->num_groups) {
774       GST_DEBUG ("enable chain %p", chain);
775       GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
776     }
777   } else {
778     g_assert (chain->num_enabled > 0);
779
780     chain->num_enabled--;
781     GST_DEBUG ("disable group %p in chain %p, now %d groups enabled out of %d",
782         group, chain, chain->num_enabled, chain->num_groups);
783
784     if (chain->num_enabled == 0) {
785       GST_DEBUG ("disable chain %p", chain);
786       GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
787     }
788   }
789 }
790
791 /* recursively migrate the group and all connected groups into the new chain */
792 static void
793 chain_recursively_migrate_group (GstOptSchedulerChain * chain,
794     GstOptSchedulerGroup * group)
795 {
796   GSList *links;
797
798   /* group already in chain */
799   if (group->chain == chain)
800     return;
801
802   /* first remove the group from its old chain */
803   remove_from_chain (group->chain, group);
804   /* add to new chain */
805   add_to_chain (chain, group);
806
807   /* then follow all links */
808   for (links = group->group_links; links; links = g_slist_next (links)) {
809     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
810
811     chain_recursively_migrate_group (chain, OTHER_GROUP_LINK (link, group));
812   }
813 }
814
815 static GstOptSchedulerGroup *
816 ref_group (GstOptSchedulerGroup * group)
817 {
818   GST_LOG ("ref group %p %d->%d", group, group->refcount, group->refcount + 1);
819
820   group->refcount++;
821
822   return group;
823 }
824
825 static GstOptSchedulerGroup *
826 unref_group (GstOptSchedulerGroup * group)
827 {
828   GST_LOG ("unref group %p %d->%d", group,
829       group->refcount, group->refcount - 1);
830
831   if (--group->refcount == 0) {
832     destroy_group (group);
833     group = NULL;
834   }
835
836   return group;
837 }
838
839 static GstOptSchedulerGroup *
840 create_group (GstOptSchedulerChain * chain, GstElement * element,
841     GstOptSchedulerGroupType type)
842 {
843   GstOptSchedulerGroup *group;
844
845   group = g_new0 (GstOptSchedulerGroup, 1);
846   GST_LOG ("new group %p, type %d", group, type);
847   group->refcount = 1;          /* float... */
848   group->flags = GST_OPT_SCHEDULER_GROUP_DISABLED;
849   group->type = type;
850   group->sched = chain->sched;
851   group->sched->live_groups++;
852
853   add_to_group (group, element, FALSE);
854   add_to_chain (chain, group);
855   group = unref_group (group);  /* ...and sink. */
856
857   GST_LOG ("%d live groups now", group->sched->live_groups);
858   /* group's refcount is now 2 (one for the element, one for the chain) */
859
860   return group;
861 }
862
863 static void
864 destroy_group (GstOptSchedulerGroup * group)
865 {
866   GST_LOG ("destroy group %p", group);
867
868   g_assert (group != NULL);
869   g_assert (group->elements == NULL);
870   g_assert (group->chain == NULL);
871   g_assert (group->group_links == NULL);
872
873   if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
874     destroy_group_scheduler (group);
875
876   group->sched->live_groups--;
877   GST_LOG ("%d live groups now", group->sched->live_groups);
878
879   g_free (group);
880 }
881
882 static GstOptSchedulerGroup *
883 add_to_group (GstOptSchedulerGroup * group, GstElement * element,
884     gboolean with_links)
885 {
886   g_assert (group != NULL);
887   g_assert (element != NULL);
888
889   GST_DEBUG ("adding element %p \"%s\" to group %p", element,
890       GST_ELEMENT_NAME (element), group);
891
892   if (GST_ELEMENT_IS_DECOUPLED (element)) {
893     GST_DEBUG ("element \"%s\" is decoupled, not adding to group %p",
894         GST_ELEMENT_NAME (element), group);
895     return group;
896   }
897
898   g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);
899
900   /* first increment the links that this group has with other groups through
901    * this element */
902   if (with_links)
903     group_inc_links_for_element (group, element);
904
905   /* Ref the group... */
906   GST_ELEMENT_SCHED_GROUP (element) = ref_group (group);
907
908   gst_object_ref (GST_OBJECT (element));
909   group->elements = g_slist_prepend (group->elements, element);
910   group->num_elements++;
911
912   if (gst_element_get_state (element) == GST_STATE_PLAYING) {
913     group_element_set_enabled (group, element, TRUE);
914   }
915
916   return group;
917 }
918
919 /* we need to remove a decoupled element from the scheduler, this
920  * involves finding all the groups that have this element as an
921  * entry point and disabling them. */
922 static void
923 remove_decoupled (GstScheduler * sched, GstElement * element)
924 {
925   GSList *chains;
926   GList *schedulers;
927   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
928
929   GST_DEBUG_OBJECT (sched, "removing decoupled element \"%s\"",
930       GST_OBJECT_NAME (element));
931
932   for (chains = osched->chains; chains; chains = g_slist_next (chains)) {
933     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
934     GSList *groups;
935
936     for (groups = chain->groups; groups; groups = g_slist_next (groups)) {
937       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
938
939       if (group->entry) {
940         GST_DEBUG_OBJECT (sched, "group %p, entry %s", group,
941             GST_STR_NULL (GST_OBJECT_NAME (group->entry)));
942       }
943
944       if (group->entry == element) {
945         GST_DEBUG ("clearing element %p \"%s\" as entry from group %p",
946             element, GST_ELEMENT_NAME (element), group);
947         group->entry = NULL;
948         group->type = GST_OPT_SCHEDULER_GROUP_UNKNOWN;
949       }
950     }
951   }
952   /* and remove from any child scheduler */
953   for (schedulers = sched->schedulers; schedulers;
954       schedulers = g_list_next (schedulers)) {
955     remove_decoupled (GST_SCHEDULER (schedulers->data), element);
956   }
957 }
958
959 static GstOptSchedulerGroup *
960 remove_from_group (GstOptSchedulerGroup * group, GstElement * element)
961 {
962   GST_DEBUG ("removing element %p \"%s\" from group %p",
963       element, GST_ELEMENT_NAME (element), group);
964
965   g_assert (group != NULL);
966   g_assert (element != NULL);
967   /* this assert also catches the decoupled elements */
968   g_assert (GST_ELEMENT_SCHED_GROUP (element) == group);
969
970   /* first decrement the links that this group has with other groups through
971    * this element */
972   group_dec_links_for_element (group, element);
973
974   if (gst_element_get_state (element) == GST_STATE_PLAYING) {
975     group_element_set_enabled (group, element, FALSE);
976   }
977
978   group->elements = g_slist_remove (group->elements, element);
979   group->num_elements--;
980
981   /* if the element was an entry point in the group, clear the group's
982    * entry point, and mark it as unknown */
983   if (group->entry == element) {
984     GST_DEBUG ("clearing element %p \"%s\" as entry from group %p",
985         element, GST_ELEMENT_NAME (element), group);
986     group->entry = NULL;
987     group->type = GST_OPT_SCHEDULER_GROUP_UNKNOWN;
988   }
989
990   GST_ELEMENT_SCHED_GROUP (element) = NULL;
991   gst_object_unref (GST_OBJECT (element));
992
993   if (group->num_elements == 0) {
994     GST_LOG ("group %p is now empty", group);
995     /* don't know in what case group->chain would be NULL, but putting this here
996        in deference to 0.8 -- remove me in 0.9 */
997     if (group->chain) {
998       GST_LOG ("removing group %p from its chain", group);
999       chain_group_set_enabled (group->chain, group, FALSE);
1000       remove_from_chain (group->chain, group);
1001     }
1002   }
1003   group = unref_group (group);
1004
1005   return group;
1006 }
1007
1008 /* check if an element is part of the given group. We have to be carefull
1009  * as decoupled elements are added as entry but are not added to the elements 
1010  * list */
1011 static gboolean
1012 group_has_element (GstOptSchedulerGroup * group, GstElement * element)
1013 {
1014   if (group->entry == element)
1015     return TRUE;
1016
1017   return (g_slist_find (group->elements, element) != NULL);
1018 }
1019
1020 /* FIXME need to check if the groups are of the same type -- otherwise need to
1021    setup the scheduler again, if it is setup */
1022 static GstOptSchedulerGroup *
1023 merge_groups (GstOptSchedulerGroup * group1, GstOptSchedulerGroup * group2)
1024 {
1025   g_assert (group1 != NULL);
1026
1027   GST_DEBUG ("merging groups %p and %p", group1, group2);
1028
1029   if (group1 == group2 || group2 == NULL)
1030     return group1;
1031
1032   /* make sure they end up in the same chain */
1033   merge_chains (group1->chain, group2->chain);
1034
1035   while (group2 && group2->elements) {
1036     GstElement *element = (GstElement *) group2->elements->data;
1037
1038     group2 = remove_from_group (group2, element);
1039     add_to_group (group1, element, TRUE);
1040   }
1041
1042   return group1;
1043 }
1044
1045 /* setup the scheduler context for a group. The right schedule function
1046  * is selected based on the group type and cothreads are created if 
1047  * needed */
1048 static void
1049 setup_group_scheduler (GstOptScheduler * osched, GstOptSchedulerGroup * group)
1050 {
1051   GroupScheduleFunction wrapper;
1052
1053   GST_DEBUG ("setup group %p scheduler, type %d", group, group->type);
1054
1055   wrapper = unknown_group_schedule_function;
1056
1057   /* figure out the wrapper function for this group */
1058   if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
1059     wrapper = get_group_schedule_function;
1060   else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
1061     wrapper = loop_group_schedule_function;
1062
1063 #ifdef USE_COTHREADS
1064   /* we can only initialize the cothread stuff when we have
1065    * a cothread context */
1066   if (osched->context) {
1067     if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
1068       do_cothread_create (group->cothread, osched->context,
1069           (cothread_func) wrapper, 0, (char **) group);
1070     } else {
1071       do_cothread_setfunc (group->cothread, osched->context,
1072           (cothread_func) wrapper, 0, (char **) group);
1073     }
1074   }
1075 #else
1076   group->schedulefunc = wrapper;
1077   group->argc = 0;
1078   group->argv = (char **) group;
1079 #endif
1080   group->flags |= GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
1081 }
1082
1083 static void
1084 destroy_group_scheduler (GstOptSchedulerGroup * group)
1085 {
1086   g_assert (group);
1087
1088   if (group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)
1089     g_warning ("destroying running group scheduler");
1090
1091 #ifdef USE_COTHREADS
1092   if (group->cothread) {
1093     do_cothread_destroy (group->cothread);
1094     group->cothread = NULL;
1095   }
1096 #else
1097   group->schedulefunc = NULL;
1098   group->argc = 0;
1099   group->argv = NULL;
1100 #endif
1101
1102   group->flags &= ~GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
1103 }
1104
1105 static void
1106 group_error_handler (GstOptSchedulerGroup * group)
1107 {
1108   GST_DEBUG ("group %p has errored", group);
1109
1110   chain_group_set_enabled (group->chain, group, FALSE);
1111   group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1112 }
1113
1114 /* this function enables/disables an element, it will set/clear a flag on the element 
1115  * and tells the chain that the group is enabled if all elements inside the group are
1116  * enabled */
1117 static void
1118 group_element_set_enabled (GstOptSchedulerGroup * group, GstElement * element,
1119     gboolean enabled)
1120 {
1121   g_assert (group != NULL);
1122   g_assert (element != NULL);
1123
1124   GST_LOG
1125       ("request to %d element %s in group %p, have %d elements enabled out of %d",
1126       enabled, GST_ELEMENT_NAME (element), group, group->num_enabled,
1127       group->num_elements);
1128
1129   /* Note that if an unlinked PLAYING element is added to a bin, we have to
1130      create a new group to hold the element, and this function will be called
1131      before the group is added to the chain. Thus we have a valid case for
1132      group->chain==NULL. */
1133
1134   if (enabled) {
1135     g_assert (group->num_enabled < group->num_elements);
1136
1137     group->num_enabled++;
1138
1139     GST_DEBUG
1140         ("enable element %s in group %p, now %d elements enabled out of %d",
1141         GST_ELEMENT_NAME (element), group, group->num_enabled,
1142         group->num_elements);
1143
1144     if (group->num_enabled == group->num_elements) {
1145       if (!group->chain) {
1146         GST_DEBUG ("enable chainless group %p", group);
1147         GST_OPT_SCHEDULER_GROUP_ENABLE (group);
1148       } else {
1149         GST_LOG ("enable group %p", group);
1150         chain_group_set_enabled (group->chain, group, TRUE);
1151       }
1152     }
1153   } else {
1154     g_assert (group->num_enabled > 0);
1155
1156     group->num_enabled--;
1157
1158     GST_DEBUG
1159         ("disable element %s in group %p, now %d elements enabled out of %d",
1160         GST_ELEMENT_NAME (element), group, group->num_enabled,
1161         group->num_elements);
1162
1163     if (group->num_enabled == 0) {
1164       if (!group->chain) {
1165         GST_DEBUG ("disable chainless group %p", group);
1166         GST_OPT_SCHEDULER_GROUP_DISABLE (group);
1167       } else {
1168         GST_LOG ("disable group %p", group);
1169         chain_group_set_enabled (group->chain, group, FALSE);
1170       }
1171     }
1172   }
1173 }
1174
1175 /* a group is scheduled by doing a cothread switch to it or
1176  * by calling the schedule function. In the non-cothread case
1177  * we cannot run already running groups so we return FALSE here
1178  * to indicate this to the caller */
1179 static gboolean
1180 schedule_group (GstOptSchedulerGroup * group)
1181 {
1182   if (!group->entry) {
1183     GST_INFO ("not scheduling group %p without entry", group);
1184     /* FIXME, we return true here, while the group is actually
1185      * not schedulable. We might want to disable the element that caused
1186      * this group to be scheduled instead */
1187     return TRUE;
1188   }
1189 #ifdef USE_COTHREADS
1190   if (group->cothread)
1191     do_cothread_switch (group->cothread);
1192   else
1193     g_warning ("(internal error): trying to schedule group without cothread");
1194   return TRUE;
1195 #else
1196   /* cothreads automatically call the pre- and post-run functions for us;
1197    * without cothreads we need to call them manually */
1198   if (group->schedulefunc == NULL) {
1199     GST_INFO ("not scheduling group %p without schedulefunc", group);
1200     return FALSE;
1201   } else {
1202     GSList *l, *lcopy;
1203     GstElement *entry = NULL;
1204
1205     lcopy = g_slist_copy (group->elements);
1206     /* also add entry point, this is made so that decoupled elements
1207      * are also reffed since they are not added to the list of group
1208      * elements. */
1209     if (group->entry && GST_ELEMENT_IS_DECOUPLED (group->entry)) {
1210       entry = group->entry;
1211       gst_object_ref (GST_OBJECT (entry));
1212     }
1213
1214     for (l = lcopy; l; l = l->next) {
1215       GstElement *e = (GstElement *) l->data;
1216
1217       gst_object_ref (GST_OBJECT (e));
1218       if (e->pre_run_func)
1219         e->pre_run_func (e);
1220     }
1221
1222     group->schedulefunc (group->argc, group->argv);
1223
1224     for (l = lcopy; l; l = l->next) {
1225       GstElement *e = (GstElement *) l->data;
1226
1227       if (e->post_run_func)
1228         e->post_run_func (e);
1229
1230       gst_object_unref (GST_OBJECT (e));
1231     }
1232     if (entry)
1233       gst_object_unref (GST_OBJECT (entry));
1234     g_slist_free (lcopy);
1235   }
1236   return TRUE;
1237 #endif
1238 }
1239
1240 #ifndef USE_COTHREADS
1241 static void
1242 gst_opt_scheduler_schedule_run_queue (GstOptScheduler * osched,
1243     GstOptSchedulerGroup * only_group)
1244 {
1245   GST_LOG_OBJECT (osched, "running queue: %d groups, recursed %d times",
1246       g_list_length (osched->runqueue),
1247       osched->recursion, g_list_length (osched->runqueue));
1248
1249   /* note that we have a ref on each group on the queue (unref after running) */
1250
1251   /* make sure we don't exceed max_recursion */
1252   if (osched->recursion > osched->max_recursion) {
1253     osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1254     return;
1255   }
1256
1257   osched->recursion++;
1258
1259   do {
1260     GstOptSchedulerGroup *group;
1261     gboolean res;
1262
1263     if (only_group)
1264       group = only_group;
1265     else
1266       group = (GstOptSchedulerGroup *) osched->runqueue->data;
1267
1268     /* runqueue holds refcount to group */
1269     osched->runqueue = g_list_remove (osched->runqueue, group);
1270
1271     GST_LOG_OBJECT (osched, "scheduling group %p", group);
1272
1273     if (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group)) {
1274       res = schedule_group (group);
1275     } else {
1276       GST_INFO_OBJECT (osched,
1277           "group was disabled while it was on the queue, not scheduling");
1278       res = TRUE;
1279     }
1280     if (!res) {
1281       g_warning ("error scheduling group %p", group);
1282       group_error_handler (group);
1283     } else {
1284       GST_LOG_OBJECT (osched, "done scheduling group %p", group);
1285     }
1286     unref_group (group);
1287   } while (osched->runqueue && !only_group);
1288
1289   GST_LOG_OBJECT (osched, "run queue length after scheduling %d",
1290       g_list_length (osched->runqueue));
1291
1292   osched->recursion--;
1293 }
1294 #endif
1295
1296 /* a chain is scheduled by picking the first active group and scheduling it */
1297 static gboolean
1298 schedule_chain (GstOptSchedulerChain * chain)
1299 {
1300   GSList *groups;
1301   GstOptScheduler *osched;
1302   gboolean scheduled = FALSE;
1303
1304   osched = chain->sched;
1305
1306   /* if the chain has changed, we need to resort the groups so we enter in the
1307      proper place */
1308   if (GST_OPT_SCHEDULER_CHAIN_IS_DIRTY (chain))
1309     sort_chain (chain);
1310   GST_OPT_SCHEDULER_CHAIN_SET_CLEAN (chain);
1311
1312   /* since the lock on the group list is only released when we schedule
1313    * a group and since we only schedule one group, we don't need to 
1314    * worry about the list getting corrupted. */
1315   groups = chain->groups;
1316   while (groups) {
1317     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1318
1319     if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
1320       ref_group (group);
1321       GST_LOG ("scheduling group %p in chain %p", group, chain);
1322
1323 #ifdef USE_COTHREADS
1324       schedule_group (group);
1325 #else
1326       osched->recursion = 0;
1327       if (!g_list_find (osched->runqueue, group)) {
1328         ref_group (group);
1329         osched->runqueue = g_list_append (osched->runqueue, group);
1330       }
1331       gst_opt_scheduler_schedule_run_queue (osched, NULL);
1332 #endif
1333
1334       scheduled = TRUE;
1335
1336       GST_LOG ("done scheduling group %p in chain %p", group, chain);
1337       unref_group (group);
1338       /* stop scheduling more groups */
1339       break;
1340     }
1341
1342     groups = g_slist_next (groups);
1343   }
1344   return scheduled;
1345 }
1346
1347 /* this function is inserted in the gethandler when you are not
1348  * supposed to call _pull on the pad. */
1349 static GstData *
1350 get_invalid_call (GstPad * pad)
1351 {
1352   GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
1353       ("get on pad %s:%s but the peer is operating chain based and so is not "
1354           "allowed to pull, fix the element.", GST_DEBUG_PAD_NAME (pad)));
1355
1356   return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1357 }
1358
1359 /* this function is inserted in the chainhandler when you are not
1360  * supposed to call _push on the pad. */
1361 static void
1362 chain_invalid_call (GstPad * pad, GstData * data)
1363 {
1364   GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
1365       ("chain on pad %s:%s but the pad is get based",
1366           GST_DEBUG_PAD_NAME (pad)));
1367
1368   gst_data_unref (data);
1369 }
1370
1371 /* a get-based group is scheduled by getting a buffer from the get based
1372  * entry point and by pushing the buffer to the peer.
1373  * We also set the running flag on this group for as long as this
1374  * function is running. */
1375 static int
1376 get_group_schedule_function (int argc, char *argv[])
1377 {
1378   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1379   GstElement *entry = group->entry;
1380   const GList *pads;
1381   GstOptScheduler *osched;
1382
1383   /* what if the entry point disappeared? */
1384   if (entry == NULL || group->chain == NULL)
1385     return 0;
1386
1387   osched = group->chain->sched;
1388
1389   pads = entry->pads;
1390
1391   GST_LOG ("executing get-based group %p", group);
1392
1393   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
1394
1395   GST_OPT_UNLOCK (osched);
1396   while (pads) {
1397     GstData *data;
1398     GstPad *pad = GST_PAD (pads->data);
1399
1400     pads = g_list_next (pads);
1401
1402     /* skip sinks and ghostpads */
1403     if (!GST_PAD_IS_SRC (pad) || !GST_IS_REAL_PAD (pad))
1404       continue;
1405
1406     GST_DEBUG ("doing get and push on pad \"%s:%s\" in group %p",
1407         GST_DEBUG_PAD_NAME (pad), group);
1408
1409     data = gst_pad_call_get_function (pad);
1410     if (data) {
1411       if (GST_EVENT_IS_INTERRUPT (data)) {
1412         GST_DEBUG ("unreffing interrupt event %p", data);
1413         gst_event_unref (GST_EVENT (data));
1414         break;
1415       }
1416       gst_pad_push (pad, data);
1417     }
1418   }
1419   GST_OPT_LOCK (osched);
1420
1421   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
1422
1423   return 0;
1424 }
1425
1426 /* a loop-based group is scheduled by calling the loop function
1427  * on the entry point. 
1428  * We also set the running flag on this group for as long as this
1429  * function is running.
1430  * This function should be called with the scheduler lock held. */
1431 static int
1432 loop_group_schedule_function (int argc, char *argv[])
1433 {
1434   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1435   GstElement *entry = group->entry;
1436
1437   GST_LOG ("executing loop-based group %p", group);
1438
1439   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
1440
1441   GST_DEBUG ("calling loopfunc of element %s in group %p",
1442       GST_ELEMENT_NAME (entry), group);
1443
1444   if (group->chain == NULL)
1445     return 0;
1446
1447   if (entry->loopfunc) {
1448     GstOptScheduler *osched = group->chain->sched;
1449
1450     GST_OPT_UNLOCK (osched);
1451     entry->loopfunc (entry);
1452     GST_OPT_LOCK (osched);
1453   } else
1454     group_error_handler (group);
1455
1456   GST_LOG ("returned from loopfunc of element %s in group %p",
1457       GST_ELEMENT_NAME (entry), group);
1458
1459   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
1460
1461   return 0;
1462
1463 }
1464
1465 /* the function to schedule an unknown group, which just gives an error */
1466 static int
1467 unknown_group_schedule_function (int argc, char *argv[])
1468 {
1469   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1470
1471   g_warning ("(internal error) unknown group type %d, disabling\n",
1472       group->type);
1473   group_error_handler (group);
1474
1475   return 0;
1476 }
1477
1478 /* this function is called when the first element of a chain-loop or a loop-loop
1479  * link performs a push to the loop element. We then schedule the
1480  * group with the loop-based element until the datapen is empty */
1481 static void
1482 gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data)
1483 {
1484   GstOptSchedulerGroup *group;
1485   GstOptScheduler *osched;
1486   GstRealPad *peer;
1487
1488   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
1489   osched = group->chain->sched;
1490   peer = GST_RPAD_PEER (sinkpad);
1491
1492   GST_LOG ("chain handler for loop-based pad %" GST_PTR_FORMAT, sinkpad);
1493
1494   GST_OPT_LOCK (osched);
1495 #ifdef USE_COTHREADS
1496   if (GST_PAD_DATALIST (peer)) {
1497     g_warning ("deadlock detected, disabling group %p", group);
1498     group_error_handler (group);
1499   } else {
1500     GST_LOG ("queueing data %p on %s:%s's datapen", data,
1501         GST_DEBUG_PAD_NAME (peer));
1502     GST_PAD_DATAPEN (peer) = g_list_append (GST_PAD_DATALIST (peer), data);
1503     schedule_group (group);
1504   }
1505 #else
1506   GST_LOG ("queueing data %p on %s:%s's datapen", data,
1507       GST_DEBUG_PAD_NAME (peer));
1508   GST_PAD_DATAPEN (peer) = g_list_append (GST_PAD_DATALIST (peer), data);
1509   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1510     GST_LOG ("adding group %p to runqueue", group);
1511     if (!g_list_find (osched->runqueue, group)) {
1512       ref_group (group);
1513       osched->runqueue = g_list_append (osched->runqueue, group);
1514     }
1515   }
1516 #endif
1517   GST_OPT_UNLOCK (osched);
1518
1519   GST_LOG ("%d datas left on %s:%s's datapen after chain handler",
1520       g_list_length (GST_PAD_DATALIST (peer)), GST_DEBUG_PAD_NAME (peer));
1521 }
1522
1523 /* this function is called by a loop based element that performs a
1524  * pull on a sinkpad. We schedule the peer group until the datapen
1525  * is filled with the data so that this function can return */
1526 static GstData *
1527 gst_opt_scheduler_get_wrapper (GstPad * srcpad)
1528 {
1529   GstData *data;
1530   GstOptSchedulerGroup *group;
1531   GstOptScheduler *osched;
1532   gboolean disabled;
1533
1534   GST_LOG ("get handler for %" GST_PTR_FORMAT, srcpad);
1535
1536   /* first try to grab a queued data */
1537   if (GST_PAD_DATALIST (srcpad)) {
1538     data = GST_PAD_DATALIST (srcpad)->data;
1539     GST_PAD_DATAPEN (srcpad) = g_list_remove (GST_PAD_DATALIST (srcpad), data);
1540
1541     GST_LOG ("returning popped queued data %p", data);
1542
1543     return data;
1544   }
1545   GST_LOG ("need to schedule the peer element");
1546
1547   /* else we need to schedule the peer element */
1548   get_group (GST_PAD_PARENT (srcpad), &group);
1549   if (group == NULL) {
1550     /* wow, peer has no group */
1551     GST_LOG ("peer without group detected");
1552     //group_error_handler (group);
1553     return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1554   }
1555   osched = group->chain->sched;
1556   data = NULL;
1557   disabled = FALSE;
1558
1559   GST_OPT_LOCK (osched);
1560   do {
1561     GST_LOG ("scheduling upstream group %p to fill datapen", group);
1562 #ifdef USE_COTHREADS
1563     schedule_group (group);
1564 #else
1565     if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1566       ref_group (group);
1567
1568       if (!g_list_find (osched->runqueue, group)) {
1569         ref_group (group);
1570         osched->runqueue = g_list_append (osched->runqueue, group);
1571       }
1572
1573       GST_LOG ("recursing into scheduler group %p", group);
1574       gst_opt_scheduler_schedule_run_queue (osched, group);
1575       GST_LOG ("return from recurse group %p", group);
1576
1577       /* if the other group was disabled we might have to break out of the loop */
1578       disabled = GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group);
1579       group = unref_group (group);
1580       /* group is gone */
1581       if (group == NULL) {
1582         /* if the group was gone we also might have to break out of the loop */
1583         disabled = TRUE;
1584       }
1585     } else {
1586       /* in this case, the group was running and we wanted to swtich to it,
1587        * this is not allowed in the optimal scheduler (yet) */
1588       g_warning ("deadlock detected, disabling group %p", group);
1589       group_error_handler (group);
1590       data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1591       goto done;
1592     }
1593 #endif
1594     /* if the scheduler interrupted, make sure we send an INTERRUPTED event
1595      * to the loop based element */
1596     if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
1597       GST_INFO ("scheduler interrupted, return interrupt event");
1598       data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1599     } else {
1600       if (GST_PAD_DATALIST (srcpad)) {
1601         data = GST_PAD_DATALIST (srcpad)->data;
1602         GST_PAD_DATAPEN (srcpad) =
1603             g_list_remove (GST_PAD_DATALIST (srcpad), data);
1604       } else if (disabled) {
1605         /* no data in queue and peer group was disabled */
1606         data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1607       }
1608     }
1609   }
1610   while (data == NULL);
1611
1612 #ifndef USE_COTHREADS
1613 done:
1614 #endif
1615   GST_OPT_UNLOCK (osched);
1616
1617   GST_LOG ("get handler, returning data %p, queue length %d",
1618       data, g_list_length (GST_PAD_DATALIST (srcpad)));
1619
1620   return data;
1621 }
1622
1623 static void
1624 pad_clear_queued (GstPad * srcpad, gpointer user_data)
1625 {
1626   GList *datalist = GST_PAD_DATALIST (srcpad);
1627
1628   if (datalist) {
1629     GST_LOG ("need to clear some datas");
1630     g_list_foreach (datalist, (GFunc) gst_data_unref, NULL);
1631     g_list_free (datalist);
1632     GST_PAD_DATAPEN (srcpad) = NULL;
1633   }
1634 }
1635
1636 static gboolean
1637 gst_opt_scheduler_event_wrapper (GstPad * srcpad, GstEvent * event)
1638 {
1639   gboolean flush;
1640
1641   GST_DEBUG ("intercepting event type %d on pad %s:%s",
1642       GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (srcpad));
1643
1644   /* figure out if this is a flush event */
1645   switch (GST_EVENT_TYPE (event)) {
1646     case GST_EVENT_FLUSH:
1647       flush = TRUE;
1648       break;
1649     case GST_EVENT_SEEK:
1650     case GST_EVENT_SEEK_SEGMENT:
1651       flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
1652       break;
1653     default:
1654       flush = FALSE;
1655       break;
1656   }
1657
1658   if (flush) {
1659     GST_LOG ("event triggers a flush");
1660
1661     pad_clear_queued (srcpad, NULL);
1662   }
1663   return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
1664 }
1665
1666 static GstElementStateReturn
1667 gst_opt_scheduler_state_transition (GstScheduler * sched, GstElement * element,
1668     gint transition)
1669 {
1670   GstOptSchedulerGroup *group;
1671   GstElementStateReturn res = GST_STATE_SUCCESS;
1672
1673   GST_DEBUG ("element \"%s\" state change (%04x)",
1674       GST_ELEMENT_NAME (element) ? GST_ELEMENT_NAME (element) : "(null)",
1675       transition);
1676
1677   GST_OPT_LOCK (sched);
1678   /* we check the state of the managing pipeline here */
1679   if (GST_IS_BIN (element)) {
1680     if (GST_SCHEDULER_PARENT (sched) == element) {
1681       GST_LOG ("parent \"%s\" changed state",
1682           GST_ELEMENT_NAME (element) ? GST_ELEMENT_NAME (element) : "(null)");
1683
1684       switch (transition) {
1685         case GST_STATE_PLAYING_TO_PAUSED:
1686           GST_INFO ("setting scheduler state to stopped");
1687           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED;
1688           break;
1689         case GST_STATE_PAUSED_TO_PLAYING:
1690           GST_INFO ("setting scheduler state to running");
1691           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING;
1692           break;
1693         default:
1694           GST_LOG ("no interesting state change, doing nothing");
1695       }
1696     }
1697     goto done;
1698   }
1699
1700   /* we don't care about decoupled elements after this */
1701   if (GST_ELEMENT_IS_DECOUPLED (element)) {
1702     res = GST_STATE_SUCCESS;
1703     goto done;
1704   }
1705
1706   /* get the group of the element */
1707   group = GST_ELEMENT_SCHED_GROUP (element);
1708
1709   switch (transition) {
1710     case GST_STATE_PAUSED_TO_PLAYING:
1711       /* an element without a group has to be an unlinked src, sink
1712        * filter element */
1713       if (!group) {
1714         GST_INFO ("element \"%s\" has no group", GST_ELEMENT_NAME (element));
1715       }
1716       /* else construct the scheduling context of this group and enable it */
1717       else {
1718         group_element_set_enabled (group, element, TRUE);
1719       }
1720       break;
1721     case GST_STATE_PLAYING_TO_PAUSED:
1722       /* if the element still has a group, we disable it */
1723       if (group)
1724         group_element_set_enabled (group, element, FALSE);
1725       break;
1726     case GST_STATE_PAUSED_TO_READY:
1727     {
1728       GList *pads = (GList *) element->pads;
1729
1730       g_list_foreach (pads, (GFunc) pad_clear_queued, NULL);
1731       break;
1732     }
1733     default:
1734       break;
1735   }
1736
1737   //gst_scheduler_show (sched);
1738
1739 done:
1740   GST_OPT_UNLOCK (sched);
1741
1742   return res;
1743 }
1744
1745 static void
1746 gst_opt_scheduler_scheduling_change (GstScheduler * sched, GstElement * element)
1747 {
1748   g_warning ("scheduling change, implement me");
1749 }
1750
1751 static void
1752 get_group (GstElement * element, GstOptSchedulerGroup ** group)
1753 {
1754   GstOptSchedulerCtx *ctx;
1755
1756   ctx = GST_ELEMENT_SCHED_CONTEXT (element);
1757   if (ctx)
1758     *group = ctx->group;
1759   else
1760     *group = NULL;
1761 }
1762
1763 /*
1764  * the idea is to put the two elements into the same group. 
1765  * - When no element is inside a group, we create a new group and add 
1766  *   the elements to it. 
1767  * - When one of the elements has a group, add the other element to 
1768  *   that group
1769  * - if both of the elements have a group, we merge the groups, which
1770  *   will also merge the chains.
1771  * Group links must be managed by the caller.
1772  */
1773 static GstOptSchedulerGroup *
1774 group_elements (GstOptScheduler * osched, GstElement * element1,
1775     GstElement * element2, GstOptSchedulerGroupType type)
1776 {
1777   GstOptSchedulerGroup *group1, *group2, *group = NULL;
1778
1779   get_group (element1, &group1);
1780   get_group (element2, &group2);
1781
1782   /* none of the elements is added to a group, create a new group
1783    * and chain to add the elements to */
1784   if (!group1 && !group2) {
1785     GstOptSchedulerChain *chain;
1786
1787     GST_DEBUG ("creating new group to hold \"%s\" and \"%s\"",
1788         GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1789
1790     chain = create_chain (osched);
1791     group = create_group (chain, element1, type);
1792     add_to_group (group, element2, TRUE);
1793   }
1794   /* the first element has a group */
1795   else if (group1) {
1796     GST_DEBUG ("adding \"%s\" to \"%s\"'s group",
1797         GST_ELEMENT_NAME (element2), GST_ELEMENT_NAME (element1));
1798
1799     /* the second element also has a group, merge */
1800     if (group2)
1801       merge_groups (group1, group2);
1802     /* the second element has no group, add it to the group
1803      * of the first element */
1804     else
1805       add_to_group (group1, element2, TRUE);
1806
1807     group = group1;
1808   }
1809   /* element1 has no group, element2 does. Add element1 to the
1810    * group of element2 */
1811   else {
1812     GST_DEBUG ("adding \"%s\" to \"%s\"'s group",
1813         GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1814     add_to_group (group2, element1, TRUE);
1815     group = group2;
1816   }
1817   return group;
1818 }
1819
1820 /*
1821  * increment link counts between groups -- it's important that src is actually
1822  * the src group, so we can introspect the topology later
1823  */
1824 static void
1825 group_inc_link (GstOptSchedulerGroup * src, GstOptSchedulerGroup * sink)
1826 {
1827   GSList *links = src->group_links;
1828   gboolean done = FALSE;
1829   GstOptSchedulerGroupLink *link;
1830
1831   /* first try to find a previous link */
1832   while (links && !done) {
1833     link = (GstOptSchedulerGroupLink *) links->data;
1834     links = g_slist_next (links);
1835
1836     if (IS_GROUP_LINK (link, src, sink)) {
1837       /* we found a link to this group, increment the link count */
1838       link->count++;
1839       GST_LOG ("incremented group link count between %p and %p to %d",
1840           src, sink, link->count);
1841       done = TRUE;
1842     }
1843   }
1844   if (!done) {
1845     /* no link was found, create a new one */
1846     link = g_new0 (GstOptSchedulerGroupLink, 1);
1847
1848     link->src = src;
1849     link->sink = sink;
1850     link->count = 1;
1851
1852     src->group_links = g_slist_prepend (src->group_links, link);
1853     sink->group_links = g_slist_prepend (sink->group_links, link);
1854
1855     src->sched->live_links++;
1856
1857     GST_DEBUG ("added group link between %p and %p, %d live links now",
1858         src, sink, src->sched->live_links);
1859   }
1860 }
1861
1862 /*
1863  * decrement link counts between groups, returns TRUE if the link count reaches
1864  * 0 -- note that the groups are not necessarily ordered as (src, sink) like
1865  * inc_link requires
1866  */
1867 static gboolean
1868 group_dec_link (GstOptSchedulerGroup * group1, GstOptSchedulerGroup * group2)
1869 {
1870   GSList *links = group1->group_links;
1871   gboolean res = FALSE;
1872   GstOptSchedulerGroupLink *link;
1873
1874   while (links) {
1875     link = (GstOptSchedulerGroupLink *) links->data;
1876     links = g_slist_next (links);
1877
1878     if (IS_GROUP_LINK (link, group1, group2)) {
1879       g_assert (link->count > 0);
1880       link->count--;
1881       GST_LOG ("link count between %p and %p is now %d",
1882           group1, group2, link->count);
1883       if (link->count == 0) {
1884         GstOptSchedulerGroup *iso_group = NULL;
1885
1886         group1->group_links = g_slist_remove (group1->group_links, link);
1887         group2->group_links = g_slist_remove (group2->group_links, link);
1888         group1->sched->live_links--;
1889
1890         GST_LOG ("%d live links now", group1->sched->live_links);
1891
1892         g_free (link);
1893         GST_DEBUG ("removed group link between %p and %p", group1, group2);
1894         if (group1->group_links == NULL) {
1895           /* group1 has no more links with other groups */
1896           iso_group = group1;
1897         } else if (group2->group_links == NULL) {
1898           /* group2 has no more links with other groups */
1899           iso_group = group2;
1900         }
1901         if (iso_group) {
1902           GstOptSchedulerChain *chain;
1903
1904           GST_DEBUG ("group %p has become isolated, moving to new chain",
1905               iso_group);
1906
1907           chain = create_chain (iso_group->chain->sched);
1908           remove_from_chain (iso_group->chain, iso_group);
1909           add_to_chain (chain, iso_group);
1910         }
1911         res = TRUE;
1912       }
1913       break;
1914     }
1915   }
1916   return res;
1917 }
1918
1919
1920 typedef enum
1921 {
1922   GST_OPT_INVALID,
1923   GST_OPT_GET_TO_CHAIN,
1924   GST_OPT_LOOP_TO_CHAIN,
1925   GST_OPT_GET_TO_LOOP,
1926   GST_OPT_CHAIN_TO_CHAIN,
1927   GST_OPT_CHAIN_TO_LOOP,
1928   GST_OPT_LOOP_TO_LOOP
1929 }
1930 LinkType;
1931
1932 /*
1933  * Entry points for this scheduler.
1934  */
1935 static void
1936 gst_opt_scheduler_setup (GstScheduler * sched)
1937 {
1938 #ifdef USE_COTHREADS
1939   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1940
1941   /* first create thread context */
1942   if (osched->context == NULL) {
1943     GST_DEBUG ("initializing cothread context");
1944     osched->context = do_cothread_context_init ();
1945   }
1946 #endif
1947 }
1948
1949 static void
1950 gst_opt_scheduler_reset (GstScheduler * sched)
1951 {
1952 #ifdef USE_COTHREADS
1953   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1954   GSList *chains = osched->chains;
1955
1956   while (chains) {
1957     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
1958     GSList *groups = chain->groups;
1959
1960     while (groups) {
1961       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1962
1963       destroy_group_scheduler (group);
1964       groups = groups->next;
1965     }
1966     chains = chains->next;
1967   }
1968
1969   if (osched->context) {
1970     do_cothread_context_destroy (osched->context);
1971     osched->context = NULL;
1972   }
1973 #endif
1974 }
1975
1976 static void
1977 gst_opt_scheduler_add_element (GstScheduler * sched, GstElement * element)
1978 {
1979   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1980   GstOptSchedulerCtx *ctx;
1981   const GList *pads;
1982
1983   GST_DEBUG_OBJECT (sched, "adding element \"%s\"", GST_OBJECT_NAME (element));
1984
1985   /* decoupled elements are not added to the scheduler lists */
1986   if (GST_ELEMENT_IS_DECOUPLED (element))
1987     return;
1988
1989   ctx = g_new0 (GstOptSchedulerCtx, 1);
1990   GST_ELEMENT (element)->sched_private = ctx;
1991   ctx->flags = GST_OPT_SCHEDULER_CTX_DISABLED;
1992
1993   /* set event handler on all pads here so events work unconnected too;
1994    * in _link, it can be overruled if need be */
1995   /* FIXME: we should also do this when new pads on the element are created;
1996      but there are no hooks, so we do it again in _link */
1997   pads = element->pads;
1998   while (pads) {
1999     GstPad *pad = GST_PAD (pads->data);
2000
2001     pads = g_list_next (pads);
2002
2003     if (!GST_IS_REAL_PAD (pad))
2004       continue;
2005     GST_RPAD_EVENTHANDLER (pad) = GST_RPAD_EVENTFUNC (pad);
2006   }
2007
2008   /* loop based elements *always* end up in their own group. It can eventually
2009    * be merged with another group when a link is made */
2010   if (element->loopfunc) {
2011     GstOptSchedulerGroup *group;
2012     GstOptSchedulerChain *chain;
2013
2014     GST_OPT_LOCK (sched);
2015     chain = create_chain (osched);
2016
2017     group = create_group (chain, element, GST_OPT_SCHEDULER_GROUP_LOOP);
2018     group->entry = element;
2019     GST_OPT_UNLOCK (sched);
2020
2021     GST_LOG ("added element \"%s\" as loop based entry",
2022         GST_ELEMENT_NAME (element));
2023   }
2024 }
2025
2026 static void
2027 gst_opt_scheduler_remove_element (GstScheduler * sched, GstElement * element)
2028 {
2029   GstOptSchedulerGroup *group;
2030
2031   GST_DEBUG_OBJECT (sched, "removing element \"%s\"",
2032       GST_OBJECT_NAME (element));
2033
2034   GST_OPT_LOCK (sched);
2035   /* decoupled elements are not added to the scheduler lists and should therefore
2036    * not be removed */
2037   if (GST_ELEMENT_IS_DECOUPLED (element)) {
2038     remove_decoupled (sched, element);
2039     goto done;
2040   }
2041
2042   /* the element is guaranteed to live in it's own group/chain now */
2043   get_group (element, &group);
2044   if (group) {
2045     remove_from_group (group, element);
2046   }
2047
2048   g_free (GST_ELEMENT (element)->sched_private);
2049   GST_ELEMENT (element)->sched_private = NULL;
2050
2051 done:
2052   GST_OPT_UNLOCK (sched);
2053 }
2054
2055 static gboolean
2056 gst_opt_scheduler_yield (GstScheduler * sched, GstElement * element)
2057 {
2058 #ifdef USE_COTHREADS
2059   /* yield hands control to the main cothread context if the requesting 
2060    * element is the entry point of the group */
2061   GstOptSchedulerGroup *group;
2062
2063   get_group (element, &group);
2064   if (group && group->entry == element)
2065     do_cothread_switch (do_cothread_get_main (((GstOptScheduler *) sched)->
2066             context));
2067
2068   return FALSE;
2069 #else
2070   g_warning ("element %s performs a yield, please fix the element",
2071       GST_ELEMENT_NAME (element));
2072   return TRUE;
2073 #endif
2074 }
2075
2076 static gboolean
2077 gst_opt_scheduler_interrupt (GstScheduler * sched, GstElement * element)
2078 {
2079   GST_INFO ("interrupt from \"%s\"", GST_OBJECT_NAME (element));
2080
2081 #ifdef USE_COTHREADS
2082   do_cothread_switch (do_cothread_get_main (((GstOptScheduler *) sched)->
2083           context));
2084   return FALSE;
2085 #else
2086   {
2087     GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2088
2089     GST_OPT_LOCK (sched);
2090     GST_INFO ("scheduler set interrupted state");
2091     osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
2092     GST_OPT_UNLOCK (sched);
2093   }
2094   return TRUE;
2095 #endif
2096 }
2097
2098 static void
2099 gst_opt_scheduler_error (GstScheduler * sched, GstElement * element)
2100 {
2101   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2102   GstOptSchedulerGroup *group;
2103
2104   GST_OPT_LOCK (sched);
2105   get_group (element, &group);
2106   if (group)
2107     group_error_handler (group);
2108
2109   osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
2110   GST_OPT_UNLOCK (sched);
2111 }
2112
2113 /* link pads, merge groups and chains */
2114 static void
2115 gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
2116     GstPad * sinkpad)
2117 {
2118   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2119   LinkType type = GST_OPT_INVALID;
2120   GstElement *src_element, *sink_element;
2121
2122   GST_INFO ("scheduling link between %s:%s and %s:%s",
2123       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
2124
2125   src_element = GST_PAD_PARENT (srcpad);
2126   sink_element = GST_PAD_PARENT (sinkpad);
2127
2128   GST_OPT_LOCK (sched);
2129   /* first we need to figure out what type of link we're dealing
2130    * with */
2131   if (src_element->loopfunc && sink_element->loopfunc)
2132     type = GST_OPT_LOOP_TO_LOOP;
2133   else {
2134     if (src_element->loopfunc) {
2135       if (GST_RPAD_CHAINFUNC (sinkpad))
2136         type = GST_OPT_LOOP_TO_CHAIN;
2137     } else if (sink_element->loopfunc) {
2138       if (GST_RPAD_GETFUNC (srcpad)) {
2139         type = GST_OPT_GET_TO_LOOP;
2140         /* this could be tricky, the get based source could 
2141          * already be part of a loop based group in another pad,
2142          * we assert on that for now */
2143         if (GST_ELEMENT_SCHED_CONTEXT (src_element) != NULL &&
2144             GST_ELEMENT_SCHED_GROUP (src_element) != NULL) {
2145           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (src_element);
2146
2147           /* if the loop based element is the entry point we're ok, if it
2148            * isn't then we have multiple loop based elements in this group */
2149           if (group->entry != sink_element) {
2150             g_error
2151                 ("internal error: cannot schedule get to loop in multi-loop based group");
2152             goto done;
2153           }
2154         }
2155       } else
2156         type = GST_OPT_CHAIN_TO_LOOP;
2157     } else {
2158       if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad)) {
2159         type = GST_OPT_GET_TO_CHAIN;
2160         /* the get based source could already be part of a loop 
2161          * based group in another pad, we assert on that for now */
2162         if (GST_ELEMENT_SCHED_CONTEXT (src_element) != NULL &&
2163             GST_ELEMENT_SCHED_GROUP (src_element) != NULL) {
2164           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (src_element);
2165
2166           /* if the get based element is the entry point we're ok, if it
2167            * isn't then we have a mixed loop/chain based group */
2168           if (group->entry != src_element) {
2169             g_error ("internal error: cannot schedule get to chain "
2170                 "with mixed loop/chain based group");
2171             goto done;
2172           }
2173         }
2174       } else
2175         type = GST_OPT_CHAIN_TO_CHAIN;
2176     }
2177   }
2178
2179   /* since we can't set event handlers on pad creation after addition, it is
2180    * best we set all of them again to the default before linking */
2181   GST_RPAD_EVENTHANDLER (srcpad) = GST_RPAD_EVENTFUNC (srcpad);
2182   GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad);
2183
2184   /* for each link type, perform specific actions */
2185   switch (type) {
2186     case GST_OPT_GET_TO_CHAIN:
2187     {
2188       GstOptSchedulerGroup *group = NULL;
2189
2190       GST_LOG ("get to chain based link");
2191
2192       /* setup get/chain handlers */
2193       GST_RPAD_GETHANDLER (srcpad) = get_invalid_call;
2194       GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function;
2195
2196       /* the two elements should be put into the same group, 
2197        * this also means that they are in the same chain automatically */
2198       group = group_elements (osched, src_element, sink_element,
2199           GST_OPT_SCHEDULER_GROUP_GET);
2200
2201       /* if there is not yet an entry in the group, select the source
2202        * element as the entry point and mark the group as a get based
2203        * group */
2204       if (!group->entry) {
2205         group->entry = src_element;
2206         group->type = GST_OPT_SCHEDULER_GROUP_GET;
2207
2208         GST_DEBUG ("setting \"%s\" as entry point of _get-based group %p",
2209             GST_ELEMENT_NAME (src_element), group);
2210
2211         setup_group_scheduler (osched, group);
2212       }
2213       break;
2214     }
2215     case GST_OPT_LOOP_TO_CHAIN:
2216     case GST_OPT_CHAIN_TO_CHAIN:
2217       GST_LOG ("loop/chain to chain based link");
2218
2219       GST_RPAD_GETHANDLER (srcpad) = get_invalid_call;
2220       GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function;
2221
2222       /* the two elements should be put into the same group, this also means
2223        * that they are in the same chain automatically, in case of a loop-based
2224        * src_element, there will be a group for src_element and sink_element
2225        * will be added to it. In the case a new group is created, we can't know
2226        * the type so we pass UNKNOWN as an arg */
2227       group_elements (osched, src_element, sink_element,
2228           GST_OPT_SCHEDULER_GROUP_UNKNOWN);
2229       break;
2230     case GST_OPT_GET_TO_LOOP:
2231       GST_LOG ("get to loop based link");
2232
2233       GST_RPAD_GETHANDLER (srcpad) = gst_pad_call_get_function;
2234       GST_RPAD_CHAINHANDLER (sinkpad) = chain_invalid_call;
2235
2236       /* the two elements should be put into the same group, this also means
2237        * that they are in the same chain automatically, sink_element is
2238        * loop-based so it already has a group where src_element will be added
2239        * to */
2240       group_elements (osched, src_element, sink_element,
2241           GST_OPT_SCHEDULER_GROUP_LOOP);
2242       break;
2243     case GST_OPT_CHAIN_TO_LOOP:
2244     case GST_OPT_LOOP_TO_LOOP:
2245     {
2246       GstOptSchedulerGroup *group1, *group2;
2247
2248       GST_LOG ("chain/loop to loop based link");
2249
2250       GST_RPAD_GETHANDLER (srcpad) = gst_opt_scheduler_get_wrapper;
2251       GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper;
2252       /* events on the srcpad have to be intercepted as we might need to
2253        * flush the buffer lists, so override the given eventfunc */
2254       GST_RPAD_EVENTHANDLER (srcpad) = gst_opt_scheduler_event_wrapper;
2255
2256       group1 = GST_ELEMENT_SCHED_GROUP (src_element);
2257       group2 = GST_ELEMENT_SCHED_GROUP (sink_element);
2258
2259       g_assert (group2 != NULL);
2260
2261       /* group2 is guaranteed to exist as it contains a loop-based element.
2262        * group1 only exists if src_element is linked to some other element */
2263       if (!group1) {
2264         /* create a new group for src_element as it cannot be merged into another group
2265          * here. we create the group in the same chain as the loop-based element. note
2266          * that creating a new group will not increment the links with other groups */
2267         GST_DEBUG ("creating new group for element %s",
2268             GST_ELEMENT_NAME (src_element));
2269         group1 =
2270             create_group (group2->chain, src_element,
2271             GST_OPT_SCHEDULER_GROUP_LOOP);
2272       } else {
2273         /* both elements are already in a group, make sure they are added to
2274          * the same chain */
2275         merge_chains (group1->chain, group2->chain);
2276       }
2277       /* increment the group link counters */
2278       group_inc_link (group1, group2);
2279       break;
2280     }
2281     case GST_OPT_INVALID:
2282       g_error ("(internal error) invalid element link, what are you doing?");
2283       break;
2284   }
2285 done:
2286   GST_OPT_UNLOCK (sched);
2287 }
2288
2289 static void
2290 group_elements_set_visited (GstOptSchedulerGroup * group, gboolean visited)
2291 {
2292   GSList *elements;
2293
2294   for (elements = group->elements; elements; elements = g_slist_next (elements)) {
2295     GstElement *element = GST_ELEMENT (elements->data);
2296
2297     if (visited) {
2298       GST_ELEMENT_SET_VISITED (element);
2299     } else {
2300       GST_ELEMENT_UNSET_VISITED (element);
2301     }
2302   }
2303   /* don't forget to set any decoupled entry points that are not accounted for in the
2304    * element list (since they belong to two groups). */
2305   if (group->entry) {
2306     if (visited) {
2307       GST_ELEMENT_SET_VISITED (group->entry);
2308     } else {
2309       GST_ELEMENT_UNSET_VISITED (group->entry);
2310     }
2311   }
2312 }
2313
2314 static GList *
2315 element_get_reachables_func (GstElement * element, GstOptSchedulerGroup * group,
2316     GstPad * brokenpad)
2317 {
2318   GList *result = NULL;
2319   const GList *pads;
2320
2321   /* if no element or element not in group or been there, return NULL */
2322   if (element == NULL || !group_has_element (group, element) ||
2323       GST_ELEMENT_IS_VISITED (element))
2324     return NULL;
2325
2326   GST_ELEMENT_SET_VISITED (element);
2327
2328   result = g_list_prepend (result, element);
2329
2330   pads = element->pads;
2331   while (pads) {
2332     GstPad *pad = GST_PAD (pads->data);
2333     GstPad *peer;
2334
2335     pads = g_list_next (pads);
2336
2337     /* we only operate on real pads and on the pad that is not broken */
2338     if (!GST_IS_REAL_PAD (pad) || pad == brokenpad)
2339       continue;
2340
2341     peer = GST_PAD_PEER (pad);
2342     if (!GST_IS_REAL_PAD (peer) || peer == brokenpad)
2343       continue;
2344
2345     if (peer) {
2346       GstElement *parent;
2347       GList *res;
2348
2349       parent = GST_PAD_PARENT (peer);
2350
2351       res = element_get_reachables_func (parent, group, brokenpad);
2352
2353       result = g_list_concat (result, res);
2354     }
2355   }
2356
2357   return result;
2358 }
2359
2360 static GList *
2361 element_get_reachables (GstElement * element, GstOptSchedulerGroup * group,
2362     GstPad * brokenpad)
2363 {
2364   GList *result;
2365
2366   /* reset visited flags */
2367   group_elements_set_visited (group, FALSE);
2368
2369   result = element_get_reachables_func (element, group, brokenpad);
2370
2371   /* and reset visited flags again */
2372   group_elements_set_visited (group, FALSE);
2373
2374   return result;
2375 }
2376
2377 /* 
2378  * checks if a target group is still reachable from the group without taking the broken
2379  * group link into account.
2380  */
2381 static gboolean
2382 group_can_reach_group (GstOptSchedulerGroup * group,
2383     GstOptSchedulerGroup * target)
2384 {
2385   gboolean reachable = FALSE;
2386   const GSList *links = group->group_links;
2387
2388   GST_LOG ("checking if group %p can reach %p", group, target);
2389
2390   /* seems like we found the target element */
2391   if (group == target) {
2392     GST_LOG ("found way to reach %p", target);
2393     return TRUE;
2394   }
2395
2396   /* if the group is marked as visited, we don't need to check here */
2397   if (GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET (group,
2398           GST_OPT_SCHEDULER_GROUP_VISITED)) {
2399     GST_LOG ("already visited %p", group);
2400     return FALSE;
2401   }
2402
2403   /* mark group as visited */
2404   GST_OPT_SCHEDULER_GROUP_SET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
2405
2406   while (links && !reachable) {
2407     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
2408     GstOptSchedulerGroup *other;
2409
2410     links = g_slist_next (links);
2411
2412     /* find other group in this link */
2413     other = OTHER_GROUP_LINK (link, group);
2414
2415     GST_LOG ("found link from %p to %p, count %d", group, other, link->count);
2416
2417     /* check if we can reach the target recursively */
2418     reachable = group_can_reach_group (other, target);
2419   }
2420   /* unset the visited flag, note that this is not optimal as we might be checking
2421    * groups several times when they are reachable with a loop. An alternative would be
2422    * to not clear the group flag at this stage but clear all flags in the chain when
2423    * all groups are checked. */
2424   GST_OPT_SCHEDULER_GROUP_UNSET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
2425
2426   GST_LOG ("leaving group %p with %s", group, (reachable ? "TRUE" : "FALSE"));
2427
2428   return reachable;
2429 }
2430
2431 /*
2432  * Go through all the pads of the given element and decrement the links that
2433  * this group has with the group of the peer element.  This function is mainly used
2434  * to update the group connections before we remove the element from the group.
2435  */
2436 static void
2437 group_dec_links_for_element (GstOptSchedulerGroup * group, GstElement * element)
2438 {
2439   GList *l;
2440   GstPad *pad;
2441   GstOptSchedulerGroup *peer_group;
2442
2443   for (l = GST_ELEMENT_PADS (element); l; l = l->next) {
2444     pad = (GstPad *) l->data;
2445     if (GST_IS_REAL_PAD (pad) && GST_PAD_PEER (pad)) {
2446       get_group (GST_PAD_PARENT (GST_PAD_PEER (pad)), &peer_group);
2447       if (peer_group && peer_group != group)
2448         group_dec_link (group, peer_group);
2449     }
2450   }
2451 }
2452
2453 /*
2454  * Go through all the pads of the given element and increment the links that
2455  * this group has with the group of the peer element.  This function is mainly used
2456  * to update the group connections before we add the element to the group.
2457  */
2458 static void
2459 group_inc_links_for_element (GstOptSchedulerGroup * group, GstElement * element)
2460 {
2461   GList *l;
2462   GstPad *pad;
2463   GstOptSchedulerGroup *peer_group;
2464
2465   GST_DEBUG ("group %p, element %s ", group, gst_element_get_name (element));
2466
2467   for (l = GST_ELEMENT_PADS (element); l; l = l->next) {
2468     pad = (GstPad *) l->data;
2469     if (GST_IS_REAL_PAD (pad) && GST_PAD_PEER (pad)) {
2470       get_group (GST_PAD_PARENT (GST_PAD_PEER (pad)), &peer_group);
2471       if (peer_group && peer_group != group)
2472         if (GST_PAD_DIRECTION (pad) == GST_PAD_SRC)
2473           group_inc_link (group, peer_group);
2474         else
2475           group_inc_link (peer_group, group);
2476     }
2477   }
2478 }
2479
2480 static void
2481 debug_element (GstElement * element, GstOptScheduler * osched)
2482 {
2483   GST_LOG ("element %s", gst_element_get_name (element));
2484 }
2485
2486 /* move this group in the chain of the groups it has links with */
2487 static void
2488 rechain_group (GstOptSchedulerGroup * group)
2489 {
2490   GstOptSchedulerChain *chain = NULL;
2491   GSList *links;
2492
2493   GST_LOG ("checking if this group needs rechaining");
2494
2495   /* follow all links */
2496   for (links = group->group_links; links; links = g_slist_next (links)) {
2497     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
2498     GstOptSchedulerGroup *other;
2499
2500     other = OTHER_GROUP_LINK (link, group);
2501     GST_LOG ("found link with other group %p with chain %p", other,
2502         other->chain);
2503
2504     /* first time, take chain */
2505     if (chain == NULL) {
2506       chain = other->chain;
2507     }
2508     /* second time, chain should be the same */
2509     else if (other->chain != chain) {
2510       g_warning ("(internal error): chain inconsistency");
2511     }
2512   }
2513   if (!chain) {
2514     GST_LOG ("no new chain found, not rechaining");
2515   } else if (chain != group->chain) {
2516     GST_LOG ("need to move group %p to chain %p", group, chain);
2517     /* remove from old chain */
2518     remove_from_chain (group->chain, group);
2519     /* and move to new chain */
2520     add_to_chain (chain, group);
2521   } else {
2522     GST_LOG ("group %p is in correct chain %p", group, chain);
2523   }
2524 }
2525
2526 /* make sure that the group does not contain only single element.
2527  * Only loop-based groups can contain a single element. */
2528 static GstOptSchedulerGroup *
2529 normalize_group (GstOptSchedulerGroup * group)
2530 {
2531   gint num;
2532   gboolean have_decoupled = FALSE;
2533
2534   if (group == NULL)
2535     return NULL;
2536
2537   num = group->num_elements;
2538   /* decoupled elements are not added to the group but are
2539    * added as an entry */
2540   if (group->entry && GST_ELEMENT_IS_DECOUPLED (group->entry)) {
2541     num++;
2542     have_decoupled = TRUE;
2543   }
2544
2545   if (num == 1 && group->type != GST_OPT_SCHEDULER_GROUP_LOOP) {
2546     GST_LOG ("removing last element from group %p", group);
2547     if (have_decoupled) {
2548       group->entry = NULL;
2549       if (group->chain) {
2550         GST_LOG ("removing group %p from its chain", group);
2551         chain_group_set_enabled (group->chain, group, FALSE);
2552         remove_from_chain (group->chain, group);
2553       }
2554       group = unref_group (group);
2555     } else {
2556       group = remove_from_group (group, GST_ELEMENT (group->elements->data));
2557     }
2558   }
2559   return group;
2560 }
2561
2562 /* migrate the element and all connected elements to a new group without looking at
2563  * the brokenpad */
2564 static GstOptSchedulerGroup *
2565 group_migrate_connected (GstOptScheduler * osched, GstElement * element,
2566     GstOptSchedulerGroup * group, GstPad * brokenpad)
2567 {
2568   GList *connected, *c;
2569   GstOptSchedulerGroup *new_group = NULL, *tst;
2570   GstOptSchedulerChain *chain;
2571   gint len;
2572
2573   if (GST_ELEMENT_IS_DECOUPLED (element)) {
2574     GST_LOG ("element is decoupled and thus not in the group");
2575     /* the element is decoupled and is therefore not in the group */
2576     return NULL;
2577   }
2578
2579   get_group (element, &tst);
2580   if (tst == NULL) {
2581     GST_LOG ("element has no group, not interesting");
2582     return NULL;
2583   }
2584
2585   GST_LOG ("migrate connected elements to new group");
2586   connected = element_get_reachables (element, group, brokenpad);
2587   GST_LOG ("elements to move to new group:");
2588   g_list_foreach (connected, (GFunc) debug_element, NULL);
2589
2590   len = g_list_length (connected);
2591
2592   if (len == 0) {
2593     g_warning ("(internal error) found lost element %s",
2594         gst_element_get_name (element));
2595     return NULL;
2596   } else if (len == 1) {
2597     group = remove_from_group (group, GST_ELEMENT (connected->data));
2598     GST_LOG
2599         ("not migrating to new group as the group would only contain 1 element");
2600     g_list_free (connected);
2601     GST_LOG ("new group is old group now");
2602     new_group = group;
2603   } else {
2604     /* we create a new chain to hold the new group */
2605     chain = create_chain (osched);
2606
2607     for (c = connected; c; c = g_list_next (c)) {
2608       GstElement *element = GST_ELEMENT (c->data);
2609
2610       group = remove_from_group (group, element);
2611       if (new_group == NULL) {
2612         new_group =
2613             create_group (chain, element, GST_OPT_SCHEDULER_GROUP_UNKNOWN);
2614       } else {
2615         add_to_group (new_group, element, TRUE);
2616       }
2617     }
2618     g_list_free (connected);
2619
2620     /* remove last element from the group if any. Make sure not to remove
2621      * the loop based entry point of a group as this always needs one group */
2622     if (group != NULL) {
2623       group = normalize_group (group);
2624     }
2625   }
2626
2627   if (new_group != NULL) {
2628     new_group = normalize_group (new_group);
2629     if (new_group == NULL)
2630       return NULL;
2631     /* at this point the new group lives in its own chain but might
2632      * have to be merged with another chain, this happens when the new
2633      * group has a link with another group in another chain */
2634     rechain_group (new_group);
2635   }
2636
2637
2638   return new_group;
2639 }
2640
2641 static void
2642 gst_opt_scheduler_pad_unlink (GstScheduler * sched,
2643     GstPad * srcpad, GstPad * sinkpad)
2644 {
2645   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2646   GstElement *src_element, *sink_element;
2647   GstOptSchedulerGroup *group1, *group2;
2648
2649   GST_INFO ("unscheduling link between %s:%s and %s:%s",
2650       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
2651
2652   src_element = GST_PAD_PARENT (srcpad);
2653   sink_element = GST_PAD_PARENT (sinkpad);
2654
2655   GST_OPT_LOCK (sched);
2656   get_group (src_element, &group1);
2657   get_group (sink_element, &group2);
2658
2659   /* for decoupled elements (that are never put into a group) we use the
2660    * group of the peer element for the remainder of the algorithm */
2661   if (GST_ELEMENT_IS_DECOUPLED (src_element)) {
2662     group1 = group2;
2663   }
2664   if (GST_ELEMENT_IS_DECOUPLED (sink_element)) {
2665     group2 = group1;
2666   }
2667
2668   /* if one the elements has no group (anymore) we don't really care 
2669    * about the link */
2670   if (!group1 || !group2) {
2671     GST_LOG
2672         ("one (or both) of the elements is not in a group, not interesting");
2673     goto done;
2674   }
2675
2676   /* easy part, groups are different */
2677   if (group1 != group2) {
2678     gboolean zero;
2679
2680     GST_LOG ("elements are in different groups");
2681
2682     /* we can remove the links between the groups now */
2683     zero = group_dec_link (group1, group2);
2684
2685     /* if the groups are not directly connected anymore, we have to perform a
2686      * recursive check to see if they are really unlinked */
2687     if (zero) {
2688       gboolean still_link;
2689       GstOptSchedulerChain *chain;
2690
2691       /* see if group1 and group2 are still connected in any indirect way */
2692       still_link = group_can_reach_group (group1, group2);
2693
2694       GST_DEBUG ("group %p %s reach group %p", group1,
2695           (still_link ? "can" : "can't"), group2);
2696       if (!still_link) {
2697         /* groups are really disconnected, migrate one group to a new chain */
2698         chain = create_chain (osched);
2699         chain_recursively_migrate_group (chain, group1);
2700
2701         GST_DEBUG ("migrated group %p to new chain %p", group1, chain);
2702       }
2703     } else {
2704       GST_DEBUG ("group %p still has direct link with group %p", group1,
2705           group2);
2706     }
2707   }
2708   /* hard part, groups are equal */
2709   else {
2710     GstOptSchedulerGroup *group;
2711
2712     /* since group1 == group2, it doesn't matter which group we take */
2713     group = group1;
2714
2715     GST_LOG ("elements are in the same group %p", group);
2716
2717     if (group->entry == NULL) {
2718       /* it doesn't really matter, we just have to make sure that both 
2719        * elements end up in another group if they are not connected */
2720       GST_LOG ("group %p has no entry, moving source element to new group",
2721           group);
2722       group_migrate_connected (osched, src_element, group, srcpad);
2723     } else {
2724       GList *reachables;
2725
2726       GST_LOG ("group %p has entry %p", group, group->entry);
2727
2728       /* get of a list of all elements that are still managed by the old
2729        * entry element */
2730       reachables = element_get_reachables (group->entry, group, srcpad);
2731       GST_LOG ("elements still reachable from the entry:");
2732       g_list_foreach (reachables, (GFunc) debug_element, sched);
2733
2734       /* if the source is reachable from the entry, we can leave it in the group */
2735       if (g_list_find (reachables, src_element)) {
2736         GST_LOG
2737             ("source element still reachable from the entry, leaving in group");
2738       } else {
2739         GST_LOG
2740             ("source element not reachable from the entry, moving to new group");
2741         group_migrate_connected (osched, src_element, group, srcpad);
2742       }
2743
2744       /* if the sink is reachable from the entry, we can leave it in the group */
2745       if (g_list_find (reachables, sink_element)) {
2746         GST_LOG
2747             ("sink element still reachable from the entry, leaving in group");
2748       } else {
2749         GST_LOG
2750             ("sink element not reachable from the entry, moving to new group");
2751         group_migrate_connected (osched, sink_element, group, srcpad);
2752       }
2753       g_list_free (reachables);
2754     }
2755     /* at this point the group can be freed and gone, so don't touch */
2756   }
2757 done:
2758   GST_OPT_UNLOCK (sched);
2759 }
2760
2761 /* a scheduler iteration is done by looping and scheduling the active chains */
2762 static GstSchedulerState
2763 gst_opt_scheduler_iterate (GstScheduler * sched)
2764 {
2765   GstSchedulerState state = GST_SCHEDULER_STATE_STOPPED;
2766   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2767   gint iterations;
2768
2769   GST_OPT_LOCK (sched);
2770   iterations = osched->iterations;
2771
2772   osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2773
2774   GST_DEBUG_OBJECT (sched, "iterating");
2775
2776   while (iterations) {
2777     gboolean scheduled = FALSE;
2778     GSList *chains;
2779
2780     /* we have to schedule each of the scheduler chains now. */
2781     chains = osched->chains;
2782     while (chains) {
2783       GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2784
2785       ref_chain (chain);
2786       /* if the chain is not disabled, schedule it */
2787       if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
2788         GST_LOG ("scheduling chain %p", chain);
2789         scheduled = schedule_chain (chain);
2790         GST_LOG ("scheduled chain %p", chain);
2791       } else {
2792         GST_LOG ("not scheduling disabled chain %p", chain);
2793       }
2794
2795       /* don't schedule any more chains when in error */
2796       if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2797         GST_ERROR_OBJECT (sched, "in error state");
2798         /* unref the chain here as we move out of the while loop */
2799         unref_chain (chain);
2800         break;
2801       } else if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
2802         GST_DEBUG_OBJECT (osched, "got interrupted, continue with next chain");
2803         osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2804       }
2805
2806       /* grab the next chain before we unref, the list we are iterating
2807        * can only be updated in the unref method */
2808       chains = g_slist_next (chains);
2809       unref_chain (chain);
2810     }
2811
2812     /* at this point it's possible that the scheduler state is
2813      * in error, we then return an error */
2814     if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2815       state = GST_SCHEDULER_STATE_ERROR;
2816       break;
2817     } else {
2818       /* if chains were scheduled, return our current state */
2819       if (scheduled)
2820         state = GST_SCHEDULER_STATE (sched);
2821       /* if no chains were scheduled, we say we are stopped */
2822       else {
2823         state = GST_SCHEDULER_STATE_STOPPED;
2824         break;
2825       }
2826     }
2827     if (iterations > 0)
2828       iterations--;
2829   }
2830   GST_OPT_UNLOCK (sched);
2831
2832   return state;
2833 }
2834
2835
2836 static void
2837 gst_opt_scheduler_show (GstScheduler * sched)
2838 {
2839   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2840   GSList *chains;
2841
2842   GST_OPT_LOCK (sched);
2843
2844   g_print ("iterations:    %d\n", osched->iterations);
2845   g_print ("max recursion: %d\n", osched->max_recursion);
2846
2847   chains = osched->chains;
2848   while (chains) {
2849     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2850     GSList *groups = chain->groups;
2851
2852     chains = g_slist_next (chains);
2853
2854     g_print ("+- chain %p: refcount %d, %d groups, %d enabled, flags %d\n",
2855         chain, chain->refcount, chain->num_groups, chain->num_enabled,
2856         chain->flags);
2857
2858     while (groups) {
2859       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
2860       GSList *elements = group->elements;
2861       GSList *group_links = group->group_links;
2862
2863       groups = g_slist_next (groups);
2864
2865       g_print
2866           (" +- group %p: refcount %d, %d elements, %d enabled, flags %d, entry %s, %s\n",
2867           group, group->refcount, group->num_elements, group->num_enabled,
2868           group->flags,
2869           (group->entry ? GST_ELEMENT_NAME (group->entry) : "(none)"),
2870           (group->type ==
2871               GST_OPT_SCHEDULER_GROUP_GET ? "get-based" : "loop-based"));
2872
2873       while (elements) {
2874         GstElement *element = (GstElement *) elements->data;
2875
2876         elements = g_slist_next (elements);
2877
2878         g_print ("  +- element %s\n", GST_ELEMENT_NAME (element));
2879       }
2880       while (group_links) {
2881         GstOptSchedulerGroupLink *link =
2882             (GstOptSchedulerGroupLink *) group_links->data;
2883
2884         group_links = g_slist_next (group_links);
2885
2886         g_print ("group link %p between %p and %p, count %d\n",
2887             link, link->src, link->sink, link->count);
2888       }
2889     }
2890   }
2891   GST_OPT_UNLOCK (sched);
2892 }
2893
2894 static void
2895 gst_opt_scheduler_get_property (GObject * object, guint prop_id,
2896     GValue * value, GParamSpec * pspec)
2897 {
2898   GstOptScheduler *osched;
2899
2900   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2901
2902   osched = GST_OPT_SCHEDULER (object);
2903
2904   switch (prop_id) {
2905     case ARG_ITERATIONS:
2906       g_value_set_int (value, osched->iterations);
2907       break;
2908     case ARG_MAX_RECURSION:
2909       g_value_set_int (value, osched->max_recursion);
2910       break;
2911     default:
2912       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2913       break;
2914   }
2915 }
2916
2917 static void
2918 gst_opt_scheduler_set_property (GObject * object, guint prop_id,
2919     const GValue * value, GParamSpec * pspec)
2920 {
2921   GstOptScheduler *osched;
2922
2923   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2924
2925   osched = GST_OPT_SCHEDULER (object);
2926
2927   switch (prop_id) {
2928     case ARG_ITERATIONS:
2929       osched->iterations = g_value_get_int (value);
2930       break;
2931     case ARG_MAX_RECURSION:
2932       osched->max_recursion = g_value_get_int (value);
2933       break;
2934     default:
2935       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2936       break;
2937   }
2938 }