92969bdcd9057f0420a351dfa8dec3f8b7adfb82
[platform/upstream/gstreamer.git] / gst / schedulers / gstoptimalscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #include <gst/gst.h>
28
29 GST_DEBUG_CATEGORY_STATIC (debug_scheduler);
30 #define GST_CAT_DEFAULT debug_scheduler
31
32 #ifdef USE_COTHREADS
33 # include "cothreads_compat.h"
34 #else
35 # define COTHREADS_NAME_CAPITAL ""
36 # define COTHREADS_NAME         ""
37 #endif
38
39 #define GST_ELEMENT_SCHED_CONTEXT(elem)         ((GstOptSchedulerCtx*) (GST_ELEMENT (elem)->sched_private))
40 #define GST_ELEMENT_SCHED_GROUP(elem)           (GST_ELEMENT_SCHED_CONTEXT (elem)->group)
41 /* need this first macro to not run into lvalue casts */
42 #define GST_PAD_DATAPEN(pad)                    (GST_REAL_PAD(pad)->sched_private)
43 #define GST_PAD_DATALIST(pad)                   ((GList*) GST_PAD_DATAPEN(pad))
44
45 #define GST_ELEMENT_COTHREAD_STOPPING                   GST_ELEMENT_SCHEDULER_PRIVATE1
46 #define GST_ELEMENT_IS_COTHREAD_STOPPING(element)       GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
47 #define GST_ELEMENT_VISITED                             GST_ELEMENT_SCHEDULER_PRIVATE2
48 #define GST_ELEMENT_IS_VISITED(element)                 GST_FLAG_IS_SET((element), GST_ELEMENT_VISITED)
49 #define GST_ELEMENT_SET_VISITED(element)                GST_FLAG_SET((element), GST_ELEMENT_VISITED)
50 #define GST_ELEMENT_UNSET_VISITED(element)              GST_FLAG_UNSET((element), GST_ELEMENT_VISITED)
51
52 typedef struct _GstOptScheduler GstOptScheduler;
53 typedef struct _GstOptSchedulerClass GstOptSchedulerClass;
54
55 #define GST_TYPE_OPT_SCHEDULER \
56   (gst_opt_scheduler_get_type())
57 #define GST_OPT_SCHEDULER(obj) \
58   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_OPT_SCHEDULER,GstOptScheduler))
59 #define GST_OPT_SCHEDULER_CLASS(klass) \
60   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_OPT_SCHEDULER,GstOptSchedulerClass))
61 #define GST_IS_OPT_SCHEDULER(obj) \
62   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OPT_SCHEDULER))
63 #define GST_IS_OPT_SCHEDULER_CLASS(obj) \
64   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OPT_SCHEDULER))
65
66 typedef enum
67 {
68   GST_OPT_SCHEDULER_STATE_NONE,
69   GST_OPT_SCHEDULER_STATE_STOPPED,
70   GST_OPT_SCHEDULER_STATE_ERROR,
71   GST_OPT_SCHEDULER_STATE_RUNNING,
72   GST_OPT_SCHEDULER_STATE_INTERRUPTED
73 }
74 GstOptSchedulerState;
75
76 struct _GstOptScheduler
77 {
78   GstScheduler parent;
79
80   GstOptSchedulerState state;
81
82 #ifdef USE_COTHREADS
83   cothread_context *context;
84 #endif
85   gint iterations;
86
87   GSList *elements;
88   GSList *chains;
89
90   GList *runqueue;
91   gint recursion;
92
93   gint max_recursion;
94   gint live_groups;
95   gint live_chains;
96   gint live_links;
97 };
98
99 struct _GstOptSchedulerClass
100 {
101   GstSchedulerClass parent_class;
102 };
103
104 static GType _gst_opt_scheduler_type = 0;
105
106 typedef enum
107 {
108   GST_OPT_SCHEDULER_CHAIN_DIRTY = (1 << 1),
109   GST_OPT_SCHEDULER_CHAIN_DISABLED = (1 << 2),
110   GST_OPT_SCHEDULER_CHAIN_RUNNING = (1 << 3)
111 }
112 GstOptSchedulerChainFlags;
113
114 #define GST_OPT_SCHEDULER_CHAIN_SET_DIRTY(chain)        ((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DIRTY)
115 #define GST_OPT_SCHEDULER_CHAIN_SET_CLEAN(chain)        ((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DIRTY)
116 #define GST_OPT_SCHEDULER_CHAIN_IS_DIRTY(chain)         ((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DIRTY)
117
118 #define GST_OPT_SCHEDULER_CHAIN_DISABLE(chain)          ((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DISABLED)
119 #define GST_OPT_SCHEDULER_CHAIN_ENABLE(chain)           ((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DISABLED)
120 #define GST_OPT_SCHEDULER_CHAIN_IS_DISABLED(chain)      ((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DISABLED)
121
122 typedef struct _GstOptSchedulerChain GstOptSchedulerChain;
123
124 struct _GstOptSchedulerChain
125 {
126   gint refcount;
127
128   GstOptScheduler *sched;
129
130   GstOptSchedulerChainFlags flags;
131
132   GSList *groups;               /* the groups in this chain */
133   gint num_groups;
134   gint num_enabled;
135 };
136
137 /* 
138  * elements that are scheduled in one cothread 
139  */
140 typedef enum
141 {
142   GST_OPT_SCHEDULER_GROUP_DIRTY = (1 << 1),     /* this group has been modified */
143   GST_OPT_SCHEDULER_GROUP_COTHREAD_STOPPING = (1 << 2), /* the group's cothread stops after one iteration */
144   GST_OPT_SCHEDULER_GROUP_DISABLED = (1 << 3),  /* this group is disabled */
145   GST_OPT_SCHEDULER_GROUP_RUNNING = (1 << 4),   /* this group is running */
146   GST_OPT_SCHEDULER_GROUP_SCHEDULABLE = (1 << 5),       /* this group is schedulable */
147   GST_OPT_SCHEDULER_GROUP_VISITED = (1 << 6)    /* this group is visited when finding links */
148 }
149 GstOptSchedulerGroupFlags;
150
151 typedef enum
152 {
153   GST_OPT_SCHEDULER_GROUP_UNKNOWN = 3,
154   GST_OPT_SCHEDULER_GROUP_GET = 1,
155   GST_OPT_SCHEDULER_GROUP_LOOP = 2
156 }
157 GstOptSchedulerGroupType;
158
159 #define GST_OPT_SCHEDULER_GROUP_SET_FLAG(group,flag)    ((group)->flags |= (flag))
160 #define GST_OPT_SCHEDULER_GROUP_UNSET_FLAG(group,flag)  ((group)->flags &= ~(flag))
161 #define GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET(group,flag) ((group)->flags & (flag))
162
163 #define GST_OPT_SCHEDULER_GROUP_DISABLE(group)          ((group)->flags |= GST_OPT_SCHEDULER_GROUP_DISABLED)
164 #define GST_OPT_SCHEDULER_GROUP_ENABLE(group)           ((group)->flags &= ~GST_OPT_SCHEDULER_GROUP_DISABLED)
165 #define GST_OPT_SCHEDULER_GROUP_IS_ENABLED(group)       (!((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED))
166 #define GST_OPT_SCHEDULER_GROUP_IS_DISABLED(group)      ((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED)
167
168
169 typedef struct _GstOptSchedulerGroup GstOptSchedulerGroup;
170 typedef struct _GstOptSchedulerGroupLink GstOptSchedulerGroupLink;
171
172 /* used to keep track of links with other groups */
173 struct _GstOptSchedulerGroupLink
174 {
175   GstOptSchedulerGroup *src;    /* the group we are linked with */
176   GstOptSchedulerGroup *sink;   /* the group we are linked with */
177   gint count;                   /* the number of links with the group */
178 };
179
180 #define IS_GROUP_LINK(link, srcg, sinkg)        ((link->src == srcg && link->sink == sinkg) || \
181                                                  (link->sink == srcg && link->src == sinkg))
182 #define OTHER_GROUP_LINK(link, group)           (link->src == group ? link->sink : link->src)
183
184 typedef int (*GroupScheduleFunction) (int argc, char *argv[]);
185
186 struct _GstOptSchedulerGroup
187 {
188   GstOptSchedulerChain *chain;  /* the chain this group belongs to */
189   GstOptSchedulerGroupFlags flags;      /* flags for this group */
190   GstOptSchedulerGroupType type;        /* flags for this group */
191   GstOptScheduler *sched;       /* the scheduler */
192
193   gint refcount;
194
195   GSList *elements;             /* elements of this group */
196   gint num_elements;
197   gint num_enabled;
198   GstElement *entry;            /* the group's entry point */
199
200   GSList *group_links;          /* other groups that are linked with this group */
201
202 #ifdef USE_COTHREADS
203   cothread *cothread;           /* the cothread of this group */
204 #else
205   GroupScheduleFunction schedulefunc;
206 #endif
207   int argc;
208   char **argv;
209 };
210
211 /* 
212  * A group is a set of elements through which data can flow without switching
213  * cothreads or without invoking the scheduler's run queue.
214  */
215 static GstOptSchedulerGroup *ref_group (GstOptSchedulerGroup * group);
216 static GstOptSchedulerGroup *unref_group (GstOptSchedulerGroup * group);
217 static GstOptSchedulerGroup *create_group (GstOptSchedulerChain * chain,
218     GstElement * element, GstOptSchedulerGroupType type);
219 static void destroy_group (GstOptSchedulerGroup * group);
220 static GstOptSchedulerGroup *add_to_group (GstOptSchedulerGroup * group,
221     GstElement * element, gboolean with_links);
222 static GstOptSchedulerGroup *remove_from_group (GstOptSchedulerGroup * group,
223     GstElement * element);
224 static void group_dec_links_for_element (GstOptSchedulerGroup * group,
225     GstElement * element);
226 static void group_inc_links_for_element (GstOptSchedulerGroup * group,
227     GstElement * element);
228 static GstOptSchedulerGroup *merge_groups (GstOptSchedulerGroup * group1,
229     GstOptSchedulerGroup * group2);
230 static void setup_group_scheduler (GstOptScheduler * osched,
231     GstOptSchedulerGroup * group);
232 static void destroy_group_scheduler (GstOptSchedulerGroup * group);
233 static void group_error_handler (GstOptSchedulerGroup * group);
234 static void group_element_set_enabled (GstOptSchedulerGroup * group,
235     GstElement * element, gboolean enabled);
236 static gboolean schedule_group (GstOptSchedulerGroup * group);
237 static void get_group (GstElement * element, GstOptSchedulerGroup ** group);
238
239
240 /* 
241  * A chain is a set of groups that are linked to each other.
242  */
243 static void destroy_chain (GstOptSchedulerChain * chain);
244 static GstOptSchedulerChain *create_chain (GstOptScheduler * osched);
245 static GstOptSchedulerChain *ref_chain (GstOptSchedulerChain * chain);
246 static GstOptSchedulerChain *unref_chain (GstOptSchedulerChain * chain);
247 static GstOptSchedulerChain *add_to_chain (GstOptSchedulerChain * chain,
248     GstOptSchedulerGroup * group);
249 static GstOptSchedulerChain *remove_from_chain (GstOptSchedulerChain * chain,
250     GstOptSchedulerGroup * group);
251 static GstOptSchedulerChain *merge_chains (GstOptSchedulerChain * chain1,
252     GstOptSchedulerChain * chain2);
253 static void chain_recursively_migrate_group (GstOptSchedulerChain * chain,
254     GstOptSchedulerGroup * group);
255 static void chain_group_set_enabled (GstOptSchedulerChain * chain,
256     GstOptSchedulerGroup * group, gboolean enabled);
257 static void schedule_chain (GstOptSchedulerChain * chain);
258
259
260 /*
261  * The schedule functions are the entry points for cothreads, or called directly
262  * by gst_opt_scheduler_schedule_run_queue
263  */
264 static int get_group_schedule_function (int argc, char *argv[]);
265 static int loop_group_schedule_function (int argc, char *argv[]);
266 static int unknown_group_schedule_function (int argc, char *argv[]);
267
268
269 /*
270  * These wrappers are set on the pads as the chain handler (what happens when
271  * gst_pad_push is called) or get handler (for gst_pad_pull).
272  */
273 static void gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data);
274 static GstData *gst_opt_scheduler_get_wrapper (GstPad * srcpad);
275
276
277 /*
278  * Without cothreads, gst_pad_push or gst_pad_pull on a loop-based group will
279  * just queue the peer element on a list. We need to actually run the queue
280  * instead of relying on cothreads to do the switch for us.
281  */
282 #ifndef USE_COTHREADS
283 static void gst_opt_scheduler_schedule_run_queue (GstOptScheduler * osched);
284 #endif
285
286
287 /* 
288  * Scheduler private data for an element 
289  */
290 typedef struct _GstOptSchedulerCtx GstOptSchedulerCtx;
291
292 typedef enum
293 {
294   GST_OPT_SCHEDULER_CTX_DISABLED = (1 << 1)     /* the element is disabled */
295 }
296 GstOptSchedulerCtxFlags;
297
298 struct _GstOptSchedulerCtx
299 {
300   GstOptSchedulerGroup *group;  /* the group this element belongs to */
301
302   GstOptSchedulerCtxFlags flags;        /* flags for this element */
303 };
304
305
306 /*
307  * Implementation of GstScheduler
308  */
309 enum
310 {
311   ARG_0,
312   ARG_ITERATIONS,
313   ARG_MAX_RECURSION
314 };
315
316 static void gst_opt_scheduler_class_init (GstOptSchedulerClass * klass);
317 static void gst_opt_scheduler_init (GstOptScheduler * scheduler);
318
319 static void gst_opt_scheduler_set_property (GObject * object, guint prop_id,
320     const GValue * value, GParamSpec * pspec);
321 static void gst_opt_scheduler_get_property (GObject * object, guint prop_id,
322     GValue * value, GParamSpec * pspec);
323
324 static void gst_opt_scheduler_dispose (GObject * object);
325
326 static void gst_opt_scheduler_setup (GstScheduler * sched);
327 static void gst_opt_scheduler_reset (GstScheduler * sched);
328 static void gst_opt_scheduler_add_element (GstScheduler * sched,
329     GstElement * element);
330 static void gst_opt_scheduler_remove_element (GstScheduler * sched,
331     GstElement * element);
332 static GstElementStateReturn gst_opt_scheduler_state_transition (GstScheduler *
333     sched, GstElement * element, gint transition);
334 static void gst_opt_scheduler_scheduling_change (GstScheduler * sched,
335     GstElement * element);
336 static gboolean gst_opt_scheduler_yield (GstScheduler * sched,
337     GstElement * element);
338 static gboolean gst_opt_scheduler_interrupt (GstScheduler * sched,
339     GstElement * element);
340 static void gst_opt_scheduler_error (GstScheduler * sched,
341     GstElement * element);
342 static void gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
343     GstPad * sinkpad);
344 static void gst_opt_scheduler_pad_unlink (GstScheduler * sched, GstPad * srcpad,
345     GstPad * sinkpad);
346 static GstSchedulerState gst_opt_scheduler_iterate (GstScheduler * sched);
347
348 static void gst_opt_scheduler_show (GstScheduler * sched);
349
350 static GstSchedulerClass *parent_class = NULL;
351
352 static GType
353 gst_opt_scheduler_get_type (void)
354 {
355   if (!_gst_opt_scheduler_type) {
356     static const GTypeInfo scheduler_info = {
357       sizeof (GstOptSchedulerClass),
358       NULL,
359       NULL,
360       (GClassInitFunc) gst_opt_scheduler_class_init,
361       NULL,
362       NULL,
363       sizeof (GstOptScheduler),
364       0,
365       (GInstanceInitFunc) gst_opt_scheduler_init,
366       NULL
367     };
368
369     _gst_opt_scheduler_type = g_type_register_static (GST_TYPE_SCHEDULER,
370         "GstOpt" COTHREADS_NAME_CAPITAL "Scheduler", &scheduler_info, 0);
371   }
372   return _gst_opt_scheduler_type;
373 }
374
375 static void
376 gst_opt_scheduler_class_init (GstOptSchedulerClass * klass)
377 {
378   GObjectClass *gobject_class;
379   GstObjectClass *gstobject_class;
380   GstSchedulerClass *gstscheduler_class;
381
382   gobject_class = (GObjectClass *) klass;
383   gstobject_class = (GstObjectClass *) klass;
384   gstscheduler_class = (GstSchedulerClass *) klass;
385
386   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
387
388   gobject_class->set_property =
389       GST_DEBUG_FUNCPTR (gst_opt_scheduler_set_property);
390   gobject_class->get_property =
391       GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property);
392   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose);
393
394   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ITERATIONS,
395       g_param_spec_int ("iterations", "Iterations",
396           "Number of groups to schedule in one iteration (-1 == until EOS/error)",
397           -1, G_MAXINT, 1, G_PARAM_READWRITE));
398 #ifndef USE_COTHREADS
399   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_RECURSION,
400       g_param_spec_int ("max_recursion", "Max recursion",
401           "Maximum number of recursions", 1, G_MAXINT, 100, G_PARAM_READWRITE));
402 #endif
403
404   gstscheduler_class->setup = GST_DEBUG_FUNCPTR (gst_opt_scheduler_setup);
405   gstscheduler_class->reset = GST_DEBUG_FUNCPTR (gst_opt_scheduler_reset);
406   gstscheduler_class->add_element =
407       GST_DEBUG_FUNCPTR (gst_opt_scheduler_add_element);
408   gstscheduler_class->remove_element =
409       GST_DEBUG_FUNCPTR (gst_opt_scheduler_remove_element);
410   gstscheduler_class->state_transition =
411       GST_DEBUG_FUNCPTR (gst_opt_scheduler_state_transition);
412   gstscheduler_class->scheduling_change =
413       GST_DEBUG_FUNCPTR (gst_opt_scheduler_scheduling_change);
414   gstscheduler_class->yield = GST_DEBUG_FUNCPTR (gst_opt_scheduler_yield);
415   gstscheduler_class->interrupt =
416       GST_DEBUG_FUNCPTR (gst_opt_scheduler_interrupt);
417   gstscheduler_class->error = GST_DEBUG_FUNCPTR (gst_opt_scheduler_error);
418   gstscheduler_class->pad_link = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_link);
419   gstscheduler_class->pad_unlink =
420       GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_unlink);
421   gstscheduler_class->clock_wait = NULL;
422   gstscheduler_class->iterate = GST_DEBUG_FUNCPTR (gst_opt_scheduler_iterate);
423   gstscheduler_class->show = GST_DEBUG_FUNCPTR (gst_opt_scheduler_show);
424
425 #ifdef USE_COTHREADS
426   do_cothreads_init (NULL);
427 #endif
428 }
429
430 static void
431 gst_opt_scheduler_init (GstOptScheduler * scheduler)
432 {
433   scheduler->elements = NULL;
434   scheduler->iterations = 1;
435   scheduler->max_recursion = 100;
436   scheduler->live_groups = 0;
437   scheduler->live_chains = 0;
438   scheduler->live_links = 0;
439 }
440
441 static void
442 gst_opt_scheduler_dispose (GObject * object)
443 {
444   G_OBJECT_CLASS (parent_class)->dispose (object);
445 }
446
447 static gboolean
448 plugin_init (GstPlugin * plugin)
449 {
450   GstSchedulerFactory *factory;
451
452   GST_DEBUG_CATEGORY_INIT (debug_scheduler, "scheduler", 0,
453       "optimal scheduler");
454
455 #ifdef USE_COTHREADS
456   factory = gst_scheduler_factory_new ("opt" COTHREADS_NAME,
457       "An optimal scheduler using " COTHREADS_NAME " cothreads",
458       gst_opt_scheduler_get_type ());
459 #else
460   factory = gst_scheduler_factory_new ("opt",
461       "An optimal scheduler using no cothreads", gst_opt_scheduler_get_type ());
462 #endif
463
464   if (factory != NULL) {
465     gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
466   } else {
467     g_warning ("could not register scheduler: optimal");
468   }
469   return TRUE;
470 }
471
472 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
473     GST_VERSION_MINOR,
474     "gstopt" COTHREADS_NAME "scheduler",
475     "An optimal scheduler using " COTHREADS_NAME " cothreads",
476     plugin_init, VERSION, GST_LICENSE, GST_PACKAGE, GST_ORIGIN);
477
478
479 static GstOptSchedulerChain *
480 ref_chain (GstOptSchedulerChain * chain)
481 {
482   GST_LOG ("ref chain %p %d->%d", chain, chain->refcount, chain->refcount + 1);
483   chain->refcount++;
484
485   return chain;
486 }
487
488 static GstOptSchedulerChain *
489 unref_chain (GstOptSchedulerChain * chain)
490 {
491   GST_LOG ("unref chain %p %d->%d", chain,
492       chain->refcount, chain->refcount - 1);
493
494   if (--chain->refcount == 0) {
495     destroy_chain (chain);
496     chain = NULL;
497   }
498
499   return chain;
500 }
501
502 static GstOptSchedulerChain *
503 create_chain (GstOptScheduler * osched)
504 {
505   GstOptSchedulerChain *chain;
506
507   chain = g_new0 (GstOptSchedulerChain, 1);
508   chain->sched = osched;
509   chain->refcount = 1;
510   chain->flags = GST_OPT_SCHEDULER_CHAIN_DISABLED;
511   osched->live_chains++;
512
513   gst_object_ref (GST_OBJECT (osched));
514   osched->chains = g_slist_prepend (osched->chains, chain);
515
516   GST_LOG ("new chain %p, %d live chains now", chain, osched->live_chains);
517
518   return chain;
519 }
520
521 static void
522 destroy_chain (GstOptSchedulerChain * chain)
523 {
524   GstOptScheduler *osched;
525
526   GST_LOG ("destroy chain %p", chain);
527
528   g_assert (chain->num_groups == 0);
529   g_assert (chain->groups == NULL);
530
531   osched = chain->sched;
532   osched->chains = g_slist_remove (osched->chains, chain);
533   osched->live_chains--;
534
535   GST_LOG ("%d live chains now", osched->live_chains);
536
537   gst_object_unref (GST_OBJECT (osched));
538
539   g_free (chain);
540 }
541
542 static GstOptSchedulerChain *
543 add_to_chain (GstOptSchedulerChain * chain, GstOptSchedulerGroup * group)
544 {
545   gboolean enabled;
546
547   GST_LOG ("adding group %p to chain %p", group, chain);
548
549   g_assert (group->chain == NULL);
550
551   group = ref_group (group);
552
553   group->chain = ref_chain (chain);
554   chain->groups = g_slist_prepend (chain->groups, group);
555   chain->num_groups++;
556
557   enabled = GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group);
558
559   if (enabled) {
560     /* we can now setup the scheduling of the group */
561     setup_group_scheduler (chain->sched, group);
562
563     chain->num_enabled++;
564     if (chain->num_enabled == chain->num_groups) {
565       GST_LOG ("enabling chain %p after adding of enabled group", chain);
566       GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
567     }
568   }
569
570   /* queue a resort of the group list, which determines which group will be run
571    * first. */
572   GST_OPT_SCHEDULER_CHAIN_SET_DIRTY (chain);
573
574   return chain;
575 }
576
577 static GstOptSchedulerChain *
578 remove_from_chain (GstOptSchedulerChain * chain, GstOptSchedulerGroup * group)
579 {
580   gboolean enabled;
581
582   GST_LOG ("removing group %p from chain %p", group, chain);
583
584   if (!chain)
585     return NULL;
586
587   g_assert (group);
588   g_assert (group->chain == chain);
589
590   enabled = GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group);
591
592   group->chain = NULL;
593   chain->groups = g_slist_remove (chain->groups, group);
594   chain->num_groups--;
595   unref_group (group);
596
597   if (chain->num_groups == 0)
598     chain = unref_chain (chain);
599   else {
600     /* removing an enabled group from the chain decrements the 
601      * enabled counter */
602     if (enabled) {
603       chain->num_enabled--;
604       if (chain->num_enabled == 0) {
605         GST_LOG ("disabling chain %p after removal of the only enabled group",
606             chain);
607         GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
608       }
609     } else {
610       if (chain->num_enabled == chain->num_groups) {
611         GST_LOG ("enabling chain %p after removal of the only disabled group",
612             chain);
613         GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
614       }
615     }
616   }
617
618   GST_OPT_SCHEDULER_CHAIN_SET_DIRTY (chain);
619
620   chain = unref_chain (chain);
621   return chain;
622 }
623
624 static GstOptSchedulerChain *
625 merge_chains (GstOptSchedulerChain * chain1, GstOptSchedulerChain * chain2)
626 {
627   GSList *walk;
628
629   g_assert (chain1 != NULL);
630
631   GST_LOG ("merging chain %p and %p", chain1, chain2);
632
633   /* FIXME: document how chain2 can be NULL */
634   if (chain1 == chain2 || chain2 == NULL)
635     return chain1;
636
637   /* switch if it's more efficient */
638   if (chain1->num_groups < chain2->num_groups) {
639     GstOptSchedulerChain *tmp = chain2;
640
641     chain2 = chain1;
642     chain1 = tmp;
643   }
644
645   walk = chain2->groups;
646   while (walk) {
647     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
648
649     walk = g_slist_next (walk);
650
651     GST_LOG ("reparenting group %p from chain %p to %p", group, chain2, chain1);
652
653     ref_group (group);
654
655     remove_from_chain (chain2, group);
656     add_to_chain (chain1, group);
657
658     unref_group (group);
659   }
660
661   /* chain2 is now freed, if nothing else was referencing it before */
662
663   return chain1;
664 }
665
666 /* sorts the group list so that terminal sinks come first -- prevents pileup of
667  * datas in datapens */
668 static void
669 sort_chain (GstOptSchedulerChain * chain)
670 {
671   GSList *original = chain->groups;
672   GSList *new = NULL;
673   GSList *walk, *links, *this;
674
675   /* if there's only one group, just return */
676   if (!original->next)
677     return;
678   /* otherwise, we know that all groups are somehow linked together */
679
680   GST_LOG ("sorting chain %p (%d groups)", chain, g_slist_length (original));
681
682   /* first find the terminal sinks */
683   for (walk = original; walk;) {
684     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
685
686     this = walk;
687     walk = walk->next;
688     if (group->group_links) {
689       gboolean is_sink = TRUE;
690
691       for (links = group->group_links; links; links = links->next)
692         if (((GstOptSchedulerGroupLink *) links->data)->src == group)
693           is_sink = FALSE;
694       if (is_sink) {
695         /* found one */
696         original = g_slist_remove_link (original, this);
697         new = g_slist_concat (new, this);
698       }
699     }
700   }
701   g_assert (new != NULL);
702
703   /* now look for the elements that are linked to the terminal sinks */
704   for (walk = new; walk; walk = walk->next) {
705     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
706
707     for (links = group->group_links; links; links = links->next) {
708       this =
709           g_slist_find (original,
710           ((GstOptSchedulerGroupLink *) links->data)->src);
711       if (this) {
712         original = g_slist_remove_link (original, this);
713         new = g_slist_concat (new, this);
714       }
715     }
716   }
717   g_assert (original == NULL);
718
719   chain->groups = new;
720 }
721
722 static void
723 chain_group_set_enabled (GstOptSchedulerChain * chain,
724     GstOptSchedulerGroup * group, gboolean enabled)
725 {
726   gboolean oldstate;
727
728   g_assert (group != NULL);
729   g_assert (chain != NULL);
730
731   GST_LOG
732       ("request to %d group %p in chain %p, have %d groups enabled out of %d",
733       enabled, group, chain, chain->num_enabled, chain->num_groups);
734
735   oldstate = (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group) ? TRUE : FALSE);
736   if (oldstate == enabled) {
737     GST_LOG ("group %p in chain %p was in correct state", group, chain);
738     return;
739   }
740
741   if (enabled)
742     GST_OPT_SCHEDULER_GROUP_ENABLE (group);
743   else
744     GST_OPT_SCHEDULER_GROUP_DISABLE (group);
745
746   if (enabled) {
747     g_assert (chain->num_enabled < chain->num_groups);
748
749     chain->num_enabled++;
750
751     GST_DEBUG ("enable group %p in chain %p, now %d groups enabled out of %d",
752         group, chain, chain->num_enabled, chain->num_groups);
753
754     /* OK to call even if the scheduler (cothread context / schedulerfunc) was
755        setup already -- will get destroyed when the group is destroyed */
756     setup_group_scheduler (chain->sched, group);
757
758     if (chain->num_enabled == chain->num_groups) {
759       GST_DEBUG ("enable chain %p", chain);
760       GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
761     }
762   } else {
763     g_assert (chain->num_enabled > 0);
764
765     chain->num_enabled--;
766     GST_DEBUG ("disable group %p in chain %p, now %d groups enabled out of %d",
767         group, chain, chain->num_enabled, chain->num_groups);
768
769     if (chain->num_enabled == 0) {
770       GST_DEBUG ("disable chain %p", chain);
771       GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
772     }
773   }
774 }
775
776 /* recursively migrate the group and all connected groups into the new chain */
777 static void
778 chain_recursively_migrate_group (GstOptSchedulerChain * chain,
779     GstOptSchedulerGroup * group)
780 {
781   GSList *links;
782
783   /* group already in chain */
784   if (group->chain == chain)
785     return;
786
787   /* first remove the group from its old chain */
788   remove_from_chain (group->chain, group);
789   /* add to new chain */
790   add_to_chain (chain, group);
791
792   /* then follow all links */
793   for (links = group->group_links; links; links = g_slist_next (links)) {
794     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
795
796     chain_recursively_migrate_group (chain, OTHER_GROUP_LINK (link, group));
797   }
798 }
799
800 static GstOptSchedulerGroup *
801 ref_group (GstOptSchedulerGroup * group)
802 {
803   GST_LOG ("ref group %p %d->%d", group, group->refcount, group->refcount + 1);
804
805   group->refcount++;
806
807   return group;
808 }
809
810 static GstOptSchedulerGroup *
811 unref_group (GstOptSchedulerGroup * group)
812 {
813   GST_LOG ("unref group %p %d->%d", group,
814       group->refcount, group->refcount - 1);
815
816   if (--group->refcount == 0) {
817     destroy_group (group);
818     group = NULL;
819   }
820
821   return group;
822 }
823
824 static GstOptSchedulerGroup *
825 create_group (GstOptSchedulerChain * chain, GstElement * element,
826     GstOptSchedulerGroupType type)
827 {
828   GstOptSchedulerGroup *group;
829
830   group = g_new0 (GstOptSchedulerGroup, 1);
831   GST_LOG ("new group %p, type %d", group, type);
832   group->refcount = 1;          /* float... */
833   group->flags = GST_OPT_SCHEDULER_GROUP_DISABLED;
834   group->type = type;
835   group->sched = chain->sched;
836   group->sched->live_groups++;
837
838   add_to_group (group, element, FALSE);
839   add_to_chain (chain, group);
840   group = unref_group (group);  /* ...and sink. */
841
842   GST_LOG ("%d live groups now", group->sched->live_groups);
843   /* group's refcount is now 2 (one for the element, one for the chain) */
844
845   return group;
846 }
847
848 static void
849 destroy_group (GstOptSchedulerGroup * group)
850 {
851   GST_LOG ("destroy group %p", group);
852
853   g_assert (group != NULL);
854   g_assert (group->elements == NULL);
855   g_assert (group->chain == NULL);
856   g_assert (group->group_links == NULL);
857
858   if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
859     destroy_group_scheduler (group);
860
861   group->sched->live_groups--;
862   GST_LOG ("%d live groups now", group->sched->live_groups);
863
864   g_free (group);
865 }
866
867 static GstOptSchedulerGroup *
868 add_to_group (GstOptSchedulerGroup * group, GstElement * element,
869     gboolean with_links)
870 {
871   g_assert (group != NULL);
872   g_assert (element != NULL);
873
874   GST_DEBUG ("adding element \"%s\" to group %p", GST_ELEMENT_NAME (element),
875       group);
876
877   if (GST_ELEMENT_IS_DECOUPLED (element)) {
878     GST_DEBUG ("element \"%s\" is decoupled, not adding to group %p",
879         GST_ELEMENT_NAME (element), group);
880     return group;
881   }
882
883   g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);
884
885   /* first increment the links that this group has with other groups through
886    * this element */
887   if (with_links)
888     group_inc_links_for_element (group, element);
889
890   /* Ref the group... */
891   GST_ELEMENT_SCHED_GROUP (element) = ref_group (group);
892
893   gst_object_ref (GST_OBJECT (element));
894   group->elements = g_slist_prepend (group->elements, element);
895   group->num_elements++;
896
897   if (gst_element_get_state (element) == GST_STATE_PLAYING) {
898     group_element_set_enabled (group, element, TRUE);
899   }
900
901   return group;
902 }
903
904 static GstOptSchedulerGroup *
905 remove_from_group (GstOptSchedulerGroup * group, GstElement * element)
906 {
907   GST_DEBUG ("removing element \"%s\" from group %p",
908       GST_ELEMENT_NAME (element), group);
909
910   g_assert (group != NULL);
911   g_assert (element != NULL);
912   g_assert (GST_ELEMENT_SCHED_GROUP (element) == group);
913
914   /* first decrement the links that this group has with other groups through
915    * this element */
916   group_dec_links_for_element (group, element);
917
918   group->elements = g_slist_remove (group->elements, element);
919   group->num_elements--;
920
921   /* if the element was an entry point in the group, clear the group's
922    * entry point, and mark it as unknown */
923   if (group->entry == element) {
924     group->entry = NULL;
925     group->type = GST_OPT_SCHEDULER_GROUP_UNKNOWN;
926   }
927
928   GST_ELEMENT_SCHED_GROUP (element) = NULL;
929   gst_object_unref (GST_OBJECT (element));
930
931   if (group->num_elements == 0) {
932     GST_LOG ("group %p is now empty", group);
933     /* don't know in what case group->chain would be NULL, but putting this here
934        in deference to 0.8 -- remove me in 0.9 */
935     if (group->chain) {
936       GST_LOG ("removing group %p from its chain", group);
937       chain_group_set_enabled (group->chain, group, FALSE);
938       remove_from_chain (group->chain, group);
939     }
940   }
941   group = unref_group (group);
942
943   return group;
944 }
945
946 /* count number of elements in the group. Have to be careful because
947  * decoupled elements are added as entry point but are not added to
948  * the elements list */
949 static gint
950 group_num_elements (GstOptSchedulerGroup * group)
951 {
952   gint num;
953
954   num = group->num_elements;
955   /* decoupled elements are not added to the group but are
956    * added as an entry */
957   if (group->entry) {
958     if (GST_ELEMENT_IS_DECOUPLED (group->entry)) {
959       num++;
960     }
961   }
962   return num;
963 }
964
965 /* check if an element is part of the given group. We have to be carefull
966  * as decoupled elements are added as entry but are not added to the elements 
967  * list */
968 static gboolean
969 group_has_element (GstOptSchedulerGroup * group, GstElement * element)
970 {
971   if (group->entry == element)
972     return TRUE;
973
974   return (g_slist_find (group->elements, element) != NULL);
975 }
976
977 /* FIXME need to check if the groups are of the same type -- otherwise need to
978    setup the scheduler again, if it is setup */
979 static GstOptSchedulerGroup *
980 merge_groups (GstOptSchedulerGroup * group1, GstOptSchedulerGroup * group2)
981 {
982   g_assert (group1 != NULL);
983
984   GST_DEBUG ("merging groups %p and %p", group1, group2);
985
986   if (group1 == group2 || group2 == NULL)
987     return group1;
988
989   /* make sure they end up in the same chain */
990   merge_chains (group1->chain, group2->chain);
991
992   while (group2 && group2->elements) {
993     GstElement *element = (GstElement *) group2->elements->data;
994
995     group2 = remove_from_group (group2, element);
996     add_to_group (group1, element, TRUE);
997   }
998
999   return group1;
1000 }
1001
1002 /* setup the scheduler context for a group. The right schedule function
1003  * is selected based on the group type and cothreads are created if 
1004  * needed */
1005 static void
1006 setup_group_scheduler (GstOptScheduler * osched, GstOptSchedulerGroup * group)
1007 {
1008   GroupScheduleFunction wrapper;
1009
1010   GST_DEBUG ("setup group %p scheduler, type %d", group, group->type);
1011
1012   wrapper = unknown_group_schedule_function;
1013
1014   /* figure out the wrapper function for this group */
1015   if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
1016     wrapper = get_group_schedule_function;
1017   else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
1018     wrapper = loop_group_schedule_function;
1019
1020 #ifdef USE_COTHREADS
1021   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
1022     do_cothread_create (group->cothread, osched->context,
1023         (cothread_func) wrapper, 0, (char **) group);
1024   } else {
1025     do_cothread_setfunc (group->cothread, osched->context,
1026         (cothread_func) wrapper, 0, (char **) group);
1027   }
1028 #else
1029   group->schedulefunc = wrapper;
1030   group->argc = 0;
1031   group->argv = (char **) group;
1032 #endif
1033   group->flags |= GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
1034 }
1035
1036 static void
1037 destroy_group_scheduler (GstOptSchedulerGroup * group)
1038 {
1039   g_assert (group);
1040
1041   if (group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)
1042     g_warning ("destroying running group scheduler");
1043
1044 #ifdef USE_COTHREADS
1045   if (group->cothread) {
1046     do_cothread_destroy (group->cothread);
1047     group->cothread = NULL;
1048   }
1049 #else
1050   group->schedulefunc = NULL;
1051   group->argc = 0;
1052   group->argv = NULL;
1053 #endif
1054
1055   group->flags &= ~GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
1056 }
1057
1058 static void
1059 group_error_handler (GstOptSchedulerGroup * group)
1060 {
1061   GST_DEBUG ("group %p has errored", group);
1062
1063   chain_group_set_enabled (group->chain, group, FALSE);
1064   group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1065 }
1066
1067 /* this function enables/disables an element, it will set/clear a flag on the element 
1068  * and tells the chain that the group is enabled if all elements inside the group are
1069  * enabled */
1070 static void
1071 group_element_set_enabled (GstOptSchedulerGroup * group, GstElement * element,
1072     gboolean enabled)
1073 {
1074   g_assert (group != NULL);
1075   g_assert (element != NULL);
1076
1077   GST_LOG
1078       ("request to %d element %s in group %p, have %d elements enabled out of %d",
1079       enabled, GST_ELEMENT_NAME (element), group, group->num_enabled,
1080       group->num_elements);
1081
1082   /* Note that if an unlinked PLAYING element is added to a bin, we have to
1083      create a new group to hold the element, and this function will be called
1084      before the group is added to the chain. Thus we have a valid case for
1085      group->chain==NULL. */
1086
1087   if (enabled) {
1088     g_assert (group->num_enabled < group->num_elements);
1089
1090     group->num_enabled++;
1091
1092     GST_DEBUG
1093         ("enable element %s in group %p, now %d elements enabled out of %d",
1094         GST_ELEMENT_NAME (element), group, group->num_enabled,
1095         group->num_elements);
1096
1097     if (group->num_enabled == group->num_elements) {
1098       if (!group->chain) {
1099         GST_DEBUG ("enable chainless group %p", group);
1100         GST_OPT_SCHEDULER_GROUP_ENABLE (group);
1101       } else {
1102         GST_LOG ("enable group %p", group);
1103         chain_group_set_enabled (group->chain, group, TRUE);
1104       }
1105     }
1106   } else {
1107     g_assert (group->num_enabled > 0);
1108
1109     group->num_enabled--;
1110
1111     GST_DEBUG
1112         ("disable element %s in group %p, now %d elements enabled out of %d",
1113         GST_ELEMENT_NAME (element), group, group->num_enabled,
1114         group->num_elements);
1115
1116     if (group->num_enabled == 0) {
1117       if (!group->chain) {
1118         GST_DEBUG ("disable chainless group %p", group);
1119         GST_OPT_SCHEDULER_GROUP_DISABLE (group);
1120       } else {
1121         GST_LOG ("disable group %p", group);
1122         chain_group_set_enabled (group->chain, group, FALSE);
1123       }
1124     }
1125   }
1126 }
1127
1128 /* a group is scheduled by doing a cothread switch to it or
1129  * by calling the schedule function. In the non-cothread case
1130  * we cannot run already running groups so we return FALSE here
1131  * to indicate this to the caller */
1132 static gboolean
1133 schedule_group (GstOptSchedulerGroup * group)
1134 {
1135   if (!group->entry) {
1136     GST_INFO ("not scheduling group %p without entry", group);
1137     /* FIXME, we return true here, while the group is actually
1138      * not schedulable. We might want to disable the element that caused
1139      * this group to be scheduled instead */
1140     return TRUE;
1141   }
1142 #ifdef USE_COTHREADS
1143   if (group->cothread)
1144     do_cothread_switch (group->cothread);
1145   else
1146     g_warning ("(internal error): trying to schedule group without cothread");
1147   return TRUE;
1148 #else
1149   /* cothreads automatically call the pre- and post-run functions for us;
1150    * without cothreads we need to call them manually */
1151   if (group->schedulefunc == NULL) {
1152     GST_INFO ("not scheduling group %p without schedulefunc", group);
1153     return FALSE;
1154   } else {
1155     GSList *l;
1156
1157     for (l = group->elements; l; l = l->next) {
1158       GstElement *e = (GstElement *) l->data;
1159
1160       if (e->pre_run_func)
1161         e->pre_run_func (e);
1162     }
1163
1164     group->schedulefunc (group->argc, group->argv);
1165
1166     for (l = group->elements; l; l = l->next) {
1167       GstElement *e = (GstElement *) l->data;
1168
1169       if (e->post_run_func)
1170         e->post_run_func (e);
1171     }
1172
1173   }
1174   return TRUE;
1175 #endif
1176 }
1177
1178 #ifndef USE_COTHREADS
1179 static void
1180 gst_opt_scheduler_schedule_run_queue (GstOptScheduler * osched)
1181 {
1182   GST_LOG_OBJECT (osched, "running queue: %d groups, recursed %d times",
1183       g_list_length (osched->runqueue),
1184       osched->recursion, g_list_length (osched->runqueue));
1185
1186   /* note that we have a ref on each group on the queue (unref after running) */
1187
1188   /* make sure we don't exceed max_recursion */
1189   if (osched->recursion > osched->max_recursion) {
1190     osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1191     return;
1192   }
1193
1194   osched->recursion++;
1195
1196   while (osched->runqueue) {
1197     GstOptSchedulerGroup *group;
1198     gboolean res;
1199
1200     group = (GstOptSchedulerGroup *) osched->runqueue->data;
1201
1202     /* runqueue holds refcount to group */
1203     osched->runqueue = g_list_remove (osched->runqueue, group);
1204
1205     GST_LOG_OBJECT (osched, "scheduling group %p", group);
1206
1207     if (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group)) {
1208       res = schedule_group (group);
1209     } else {
1210       GST_INFO_OBJECT (osched,
1211           "group was disabled while it was on the queue, not scheduling");
1212       res = TRUE;
1213     }
1214     if (!res) {
1215       g_warning ("error scheduling group %p", group);
1216       group_error_handler (group);
1217     } else {
1218       GST_LOG_OBJECT (osched, "done scheduling group %p", group);
1219     }
1220     unref_group (group);
1221   }
1222
1223   GST_LOG_OBJECT (osched, "run queue length after scheduling %d",
1224       g_list_length (osched->runqueue));
1225
1226   osched->recursion--;
1227 }
1228 #endif
1229
1230 /* a chain is scheduled by picking the first active group and scheduling it */
1231 static void
1232 schedule_chain (GstOptSchedulerChain * chain)
1233 {
1234   GSList *groups;
1235   GstOptScheduler *osched;
1236
1237   osched = chain->sched;
1238
1239   /* if the chain has changed, we need to resort the groups so we enter in the
1240      proper place */
1241   if (GST_OPT_SCHEDULER_CHAIN_IS_DIRTY (chain))
1242     sort_chain (chain);
1243   GST_OPT_SCHEDULER_CHAIN_SET_CLEAN (chain);
1244
1245   groups = chain->groups;
1246   while (groups) {
1247     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1248
1249     if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
1250       ref_group (group);
1251       GST_LOG ("scheduling group %p in chain %p", group, chain);
1252
1253 #ifdef USE_COTHREADS
1254       schedule_group (group);
1255 #else
1256       osched->recursion = 0;
1257       if (!g_list_find (osched->runqueue, group)) {
1258         ref_group (group);
1259         osched->runqueue = g_list_append (osched->runqueue, group);
1260       }
1261       gst_opt_scheduler_schedule_run_queue (osched);
1262 #endif
1263
1264       GST_LOG ("done scheduling group %p in chain %p", group, chain);
1265       unref_group (group);
1266       break;
1267     }
1268
1269     groups = g_slist_next (groups);
1270   }
1271 }
1272
1273 /* a get-based group is scheduled by getting a buffer from the get based
1274  * entry point and by pushing the buffer to the peer.
1275  * We also set the running flag on this group for as long as this
1276  * function is running. */
1277 static int
1278 get_group_schedule_function (int argc, char *argv[])
1279 {
1280   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1281   GstElement *entry = group->entry;
1282   const GList *pads = gst_element_get_pad_list (entry);
1283
1284   GST_LOG ("executing get-based group %p", group);
1285
1286   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
1287
1288   while (pads) {
1289     GstData *data;
1290     GstPad *pad = GST_PAD (pads->data);
1291
1292     pads = g_list_next (pads);
1293
1294     /* skip sinks and ghostpads */
1295     if (!GST_PAD_IS_SRC (pad) || !GST_IS_REAL_PAD (pad))
1296       continue;
1297
1298     GST_DEBUG ("doing get and push on pad \"%s:%s\" in group %p",
1299         GST_DEBUG_PAD_NAME (pad), group);
1300
1301     data = gst_pad_call_get_function (pad);
1302     if (data) {
1303       if (GST_EVENT_IS_INTERRUPT (data)) {
1304         GST_DEBUG ("unreffing interrupt event %p", data);
1305         gst_event_unref (GST_EVENT (data));
1306         break;
1307       }
1308       gst_pad_push (pad, data);
1309     }
1310   }
1311
1312   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
1313
1314   return 0;
1315 }
1316
1317 /* a loop-based group is scheduled by calling the loop function
1318  * on the entry point. 
1319  * We also set the running flag on this group for as long as this
1320  * function is running. */
1321 static int
1322 loop_group_schedule_function (int argc, char *argv[])
1323 {
1324   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1325   GstElement *entry = group->entry;
1326
1327   GST_LOG ("executing loop-based group %p", group);
1328
1329   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
1330
1331   GST_DEBUG ("calling loopfunc of element %s in group %p",
1332       GST_ELEMENT_NAME (entry), group);
1333
1334   if (entry->loopfunc)
1335     entry->loopfunc (entry);
1336   else
1337     group_error_handler (group);
1338
1339   GST_LOG ("returned from loopfunc of element %s in group %p",
1340       GST_ELEMENT_NAME (entry), group);
1341
1342   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
1343
1344   return 0;
1345
1346 }
1347
1348 /* the function to schedule an unknown group, which just gives an error */
1349 static int
1350 unknown_group_schedule_function (int argc, char *argv[])
1351 {
1352   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1353
1354   g_warning ("(internal error) unknown group type %d, disabling\n",
1355       group->type);
1356   group_error_handler (group);
1357
1358   return 0;
1359 }
1360
1361 /* this function is called when the first element of a chain-loop or a loop-loop
1362  * link performs a push to the loop element. We then schedule the
1363  * group with the loop-based element until the datapen is empty */
1364 static void
1365 gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data)
1366 {
1367   GstOptSchedulerGroup *group;
1368   GstOptScheduler *osched;
1369   GstRealPad *peer;
1370
1371   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
1372   osched = group->chain->sched;
1373   peer = GST_RPAD_PEER (sinkpad);
1374
1375   GST_LOG ("chain handler for loop-based pad %" GST_PTR_FORMAT, sinkpad);
1376
1377 #ifdef USE_COTHREADS
1378   if (GST_PAD_DATALIST (peer)) {
1379     g_warning ("deadlock detected, disabling group %p", group);
1380     group_error_handler (group);
1381   } else {
1382     GST_LOG ("queueing data %p on %s:%s's datapen", data,
1383         GST_DEBUG_PAD_NAME (peer));
1384     GST_PAD_DATAPEN (peer) = g_list_append (GST_PAD_DATALIST (peer), data);
1385     schedule_group (group);
1386   }
1387 #else
1388   GST_LOG ("queueing data %p on %s:%s's datapen", data,
1389       GST_DEBUG_PAD_NAME (peer));
1390   GST_PAD_DATAPEN (peer) = g_list_append (GST_PAD_DATALIST (peer), data);
1391   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1392     GST_LOG ("adding group %p to runqueue", group);
1393     if (!g_list_find (osched->runqueue, group)) {
1394       ref_group (group);
1395       osched->runqueue = g_list_append (osched->runqueue, group);
1396     }
1397   }
1398 #endif
1399
1400   GST_LOG ("%d datas left on %s:%s's datapen after chain handler",
1401       g_list_length (GST_PAD_DATALIST (peer)), GST_DEBUG_PAD_NAME (peer));
1402 }
1403
1404 /* this function is called by a loop based element that performs a
1405  * pull on a sinkpad. We schedule the peer group until the datapen
1406  * is filled with the data so that this function can return */
1407 static GstData *
1408 gst_opt_scheduler_get_wrapper (GstPad * srcpad)
1409 {
1410   GstData *data;
1411   GstOptSchedulerGroup *group;
1412   GstOptScheduler *osched;
1413   gboolean disabled;
1414
1415   GST_LOG ("get handler for %" GST_PTR_FORMAT, srcpad);
1416
1417   /* first try to grab a queued data */
1418   if (GST_PAD_DATALIST (srcpad)) {
1419     data = GST_PAD_DATALIST (srcpad)->data;
1420     GST_PAD_DATAPEN (srcpad) = g_list_remove (GST_PAD_DATALIST (srcpad), data);
1421
1422     GST_LOG ("returning popped queued data %p", data);
1423
1424     return data;
1425   }
1426   GST_LOG ("need to schedule the peer element");
1427
1428   /* else we need to schedule the peer element */
1429   get_group (GST_PAD_PARENT (srcpad), &group);
1430   if (group == NULL) {
1431     /* wow, peer has no group */
1432     GST_LOG ("peer without group detected");
1433     //group_error_handler (group);
1434     return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1435   }
1436   osched = group->chain->sched;
1437   data = NULL;
1438   disabled = FALSE;
1439
1440   do {
1441     GST_LOG ("scheduling upstream group %p to fill datapen", group);
1442 #ifdef USE_COTHREADS
1443     schedule_group (group);
1444 #else
1445     if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1446       ref_group (group);
1447
1448       if (!g_list_find (osched->runqueue, group)) {
1449         ref_group (group);
1450         osched->runqueue = g_list_append (osched->runqueue, group);
1451       }
1452
1453       GST_LOG ("recursing into scheduler group %p", group);
1454       gst_opt_scheduler_schedule_run_queue (osched);
1455       GST_LOG ("return from recurse group %p", group);
1456
1457       /* if the other group was disabled we might have to break out of the loop */
1458       disabled = GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group);
1459       group = unref_group (group);
1460       /* group is gone */
1461       if (group == NULL) {
1462         /* if the group was gone we also might have to break out of the loop */
1463         disabled = TRUE;
1464       }
1465     } else {
1466       /* in this case, the group was running and we wanted to swtich to it,
1467        * this is not allowed in the optimal scheduler (yet) */
1468       g_warning ("deadlock detected, disabling group %p", group);
1469       group_error_handler (group);
1470       return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1471     }
1472 #endif
1473     /* if the scheduler interrupted, make sure we send an INTERRUPTED event
1474      * to the loop based element */
1475     if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
1476       GST_INFO ("scheduler interrupted, return interrupt event");
1477       data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1478     } else {
1479       if (GST_PAD_DATALIST (srcpad)) {
1480         data = GST_PAD_DATALIST (srcpad)->data;
1481         GST_PAD_DATAPEN (srcpad) =
1482             g_list_remove (GST_PAD_DATALIST (srcpad), data);
1483       } else if (disabled) {
1484         /* no data in queue and peer group was disabled */
1485         data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1486       }
1487     }
1488   }
1489   while (data == NULL);
1490
1491   GST_LOG ("get handler, returning data %p, queue length %d",
1492       data, g_list_length (GST_PAD_DATALIST (srcpad)));
1493
1494   return data;
1495 }
1496
1497 static void
1498 pad_clear_queued (GstPad * srcpad, gpointer user_data)
1499 {
1500   GList *datalist = GST_PAD_DATALIST (srcpad);
1501
1502   if (datalist) {
1503     GST_LOG ("need to clear some datas");
1504     g_list_foreach (datalist, (GFunc) gst_data_unref, NULL);
1505     g_list_free (datalist);
1506     GST_PAD_DATAPEN (srcpad) = NULL;
1507   }
1508 }
1509
1510 static gboolean
1511 gst_opt_scheduler_event_wrapper (GstPad * srcpad, GstEvent * event)
1512 {
1513   gboolean flush;
1514
1515   GST_DEBUG ("intercepting event type %d on pad %s:%s",
1516       GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (srcpad));
1517
1518   /* figure out if this is a flush event */
1519   switch (GST_EVENT_TYPE (event)) {
1520     case GST_EVENT_FLUSH:
1521       flush = TRUE;
1522       break;
1523     case GST_EVENT_SEEK:
1524     case GST_EVENT_SEEK_SEGMENT:
1525       flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
1526       break;
1527     default:
1528       flush = FALSE;
1529       break;
1530   }
1531
1532   if (flush) {
1533     GST_LOG ("event triggers a flush");
1534
1535     pad_clear_queued (srcpad, NULL);
1536   }
1537   return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
1538 }
1539
1540 static GstElementStateReturn
1541 gst_opt_scheduler_state_transition (GstScheduler * sched, GstElement * element,
1542     gint transition)
1543 {
1544   GstOptSchedulerGroup *group;
1545   GstElementStateReturn res = GST_STATE_SUCCESS;
1546
1547   GST_DEBUG ("element \"%s\" state change (%04x)", GST_ELEMENT_NAME (element),
1548       transition);
1549
1550   /* we check the state of the managing pipeline here */
1551   if (GST_IS_BIN (element)) {
1552     if (GST_SCHEDULER_PARENT (sched) == element) {
1553       GST_LOG ("parent \"%s\" changed state", GST_ELEMENT_NAME (element));
1554
1555       switch (transition) {
1556         case GST_STATE_PLAYING_TO_PAUSED:
1557           GST_INFO ("setting scheduler state to stopped");
1558           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED;
1559           break;
1560         case GST_STATE_PAUSED_TO_PLAYING:
1561           GST_INFO ("setting scheduler state to running");
1562           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING;
1563           break;
1564         default:
1565           GST_LOG ("no interesting state change, doing nothing");
1566       }
1567     }
1568     return res;
1569   }
1570
1571   /* we don't care about decoupled elements after this */
1572   if (GST_ELEMENT_IS_DECOUPLED (element))
1573     return GST_STATE_SUCCESS;
1574
1575   /* get the group of the element */
1576   group = GST_ELEMENT_SCHED_GROUP (element);
1577
1578   switch (transition) {
1579     case GST_STATE_PAUSED_TO_PLAYING:
1580       /* an element without a group has to be an unlinked src, sink
1581        * filter element */
1582       if (!group) {
1583         GST_INFO ("element \"%s\" has no group", GST_ELEMENT_NAME (element));
1584       }
1585       /* else construct the scheduling context of this group and enable it */
1586       else {
1587         group_element_set_enabled (group, element, TRUE);
1588       }
1589       break;
1590     case GST_STATE_PLAYING_TO_PAUSED:
1591       /* if the element still has a group, we disable it */
1592       if (group)
1593         group_element_set_enabled (group, element, FALSE);
1594       break;
1595     case GST_STATE_PAUSED_TO_READY:
1596     {
1597       GList *pads = (GList *) gst_element_get_pad_list (element);
1598
1599       g_list_foreach (pads, (GFunc) pad_clear_queued, NULL);
1600       break;
1601     }
1602     default:
1603       break;
1604   }
1605
1606   return res;
1607 }
1608
1609 static void
1610 gst_opt_scheduler_scheduling_change (GstScheduler * sched, GstElement * element)
1611 {
1612   g_warning ("scheduling change, implement me");
1613 }
1614
1615 static void
1616 get_group (GstElement * element, GstOptSchedulerGroup ** group)
1617 {
1618   GstOptSchedulerCtx *ctx;
1619
1620   ctx = GST_ELEMENT_SCHED_CONTEXT (element);
1621   if (ctx)
1622     *group = ctx->group;
1623   else
1624     *group = NULL;
1625 }
1626
1627 /*
1628  * the idea is to put the two elements into the same group. 
1629  * - When no element is inside a group, we create a new group and add 
1630  *   the elements to it. 
1631  * - When one of the elements has a group, add the other element to 
1632  *   that group
1633  * - if both of the elements have a group, we merge the groups, which
1634  *   will also merge the chains.
1635  * Group links must be managed by the caller.
1636  */
1637 static GstOptSchedulerGroup *
1638 group_elements (GstOptScheduler * osched, GstElement * element1,
1639     GstElement * element2, GstOptSchedulerGroupType type)
1640 {
1641   GstOptSchedulerGroup *group1, *group2, *group = NULL;
1642
1643   get_group (element1, &group1);
1644   get_group (element2, &group2);
1645
1646   /* none of the elements is added to a group, create a new group
1647    * and chain to add the elements to */
1648   if (!group1 && !group2) {
1649     GstOptSchedulerChain *chain;
1650
1651     GST_DEBUG ("creating new group to hold \"%s\" and \"%s\"",
1652         GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1653
1654     chain = create_chain (osched);
1655     group = create_group (chain, element1, type);
1656     add_to_group (group, element2, TRUE);
1657   }
1658   /* the first element has a group */
1659   else if (group1) {
1660     GST_DEBUG ("adding \"%s\" to \"%s\"'s group",
1661         GST_ELEMENT_NAME (element2), GST_ELEMENT_NAME (element1));
1662
1663     /* the second element also has a group, merge */
1664     if (group2)
1665       merge_groups (group1, group2);
1666     /* the second element has no group, add it to the group
1667      * of the first element */
1668     else
1669       add_to_group (group1, element2, TRUE);
1670
1671     group = group1;
1672   }
1673   /* element1 has no group, element2 does. Add element1 to the
1674    * group of element2 */
1675   else {
1676     GST_DEBUG ("adding \"%s\" to \"%s\"'s group",
1677         GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1678     add_to_group (group2, element1, TRUE);
1679     group = group2;
1680   }
1681   return group;
1682 }
1683
1684 /*
1685  * increment link counts between groups -- it's important that src is actually
1686  * the src group, so we can introspect the topology later
1687  */
1688 static void
1689 group_inc_link (GstOptSchedulerGroup * src, GstOptSchedulerGroup * sink)
1690 {
1691   GSList *links = src->group_links;
1692   gboolean done = FALSE;
1693   GstOptSchedulerGroupLink *link;
1694
1695   /* first try to find a previous link */
1696   while (links && !done) {
1697     link = (GstOptSchedulerGroupLink *) links->data;
1698     links = g_slist_next (links);
1699
1700     if (IS_GROUP_LINK (link, src, sink)) {
1701       /* we found a link to this group, increment the link count */
1702       link->count++;
1703       GST_LOG ("incremented group link count between %p and %p to %d",
1704           src, sink, link->count);
1705       done = TRUE;
1706     }
1707   }
1708   if (!done) {
1709     /* no link was found, create a new one */
1710     link = g_new0 (GstOptSchedulerGroupLink, 1);
1711
1712     link->src = src;
1713     link->sink = sink;
1714     link->count = 1;
1715
1716     src->group_links = g_slist_prepend (src->group_links, link);
1717     sink->group_links = g_slist_prepend (sink->group_links, link);
1718
1719     src->sched->live_links++;
1720
1721     GST_DEBUG ("added group link between %p and %p, %d live links now",
1722         src, sink, src->sched->live_links);
1723   }
1724 }
1725
1726 /*
1727  * decrement link counts between groups, returns TRUE if the link count reaches
1728  * 0 -- note that the groups are not necessarily ordered as (src, sink) like
1729  * inc_link requires
1730  */
1731 static gboolean
1732 group_dec_link (GstOptSchedulerGroup * group1, GstOptSchedulerGroup * group2)
1733 {
1734   GSList *links = group1->group_links;
1735   gboolean res = FALSE;
1736   GstOptSchedulerGroupLink *link;
1737
1738   while (links) {
1739     link = (GstOptSchedulerGroupLink *) links->data;
1740     links = g_slist_next (links);
1741
1742     if (IS_GROUP_LINK (link, group1, group2)) {
1743       g_assert (link->count > 0);
1744       link->count--;
1745       GST_LOG ("link count between %p and %p is now %d",
1746           group1, group2, link->count);
1747       if (link->count == 0) {
1748         GstOptSchedulerGroup *iso_group = NULL;
1749
1750         group1->group_links = g_slist_remove (group1->group_links, link);
1751         group2->group_links = g_slist_remove (group2->group_links, link);
1752         group1->sched->live_links--;
1753
1754         GST_LOG ("%d live links now", group1->sched->live_links);
1755
1756         g_free (link);
1757         GST_DEBUG ("removed group link between %p and %p", group1, group2);
1758         if (group1->group_links == NULL) {
1759           /* group1 has no more links with other groups */
1760           iso_group = group1;
1761         } else if (group2->group_links == NULL) {
1762           /* group2 has no more links with other groups */
1763           iso_group = group2;
1764         }
1765         if (iso_group) {
1766           GstOptSchedulerChain *chain;
1767
1768           GST_DEBUG ("group %p has become isolated, moving to new chain",
1769               iso_group);
1770
1771           chain = create_chain (iso_group->chain->sched);
1772           remove_from_chain (iso_group->chain, iso_group);
1773           add_to_chain (chain, iso_group);
1774         }
1775         res = TRUE;
1776       }
1777       break;
1778     }
1779   }
1780   return res;
1781 }
1782
1783
1784 typedef enum
1785 {
1786   GST_OPT_INVALID,
1787   GST_OPT_GET_TO_CHAIN,
1788   GST_OPT_LOOP_TO_CHAIN,
1789   GST_OPT_GET_TO_LOOP,
1790   GST_OPT_CHAIN_TO_CHAIN,
1791   GST_OPT_CHAIN_TO_LOOP,
1792   GST_OPT_LOOP_TO_LOOP
1793 }
1794 LinkType;
1795
1796 /*
1797  * Entry points for this scheduler.
1798  */
1799 static void
1800 gst_opt_scheduler_setup (GstScheduler * sched)
1801 {
1802 #ifdef USE_COTHREADS
1803   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1804
1805   /* first create thread context */
1806   if (osched->context == NULL) {
1807     GST_DEBUG ("initializing cothread context");
1808     osched->context = do_cothread_context_init ();
1809   }
1810 #endif
1811 }
1812
1813 static void
1814 gst_opt_scheduler_reset (GstScheduler * sched)
1815 {
1816 #ifdef USE_COTHREADS
1817   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1818   GSList *chains = osched->chains;
1819
1820   while (chains) {
1821     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
1822     GSList *groups = chain->groups;
1823
1824     while (groups) {
1825       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1826
1827       destroy_group_scheduler (group);
1828       groups = groups->next;
1829     }
1830     chains = chains->next;
1831   }
1832
1833   if (osched->context) {
1834     do_cothread_context_destroy (osched->context);
1835     osched->context = NULL;
1836   }
1837 #endif
1838 }
1839
1840 static void
1841 gst_opt_scheduler_add_element (GstScheduler * sched, GstElement * element)
1842 {
1843   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1844   GstOptSchedulerCtx *ctx;
1845   const GList *pads;
1846
1847   GST_DEBUG_OBJECT (sched, "adding element \"%s\"", GST_OBJECT_NAME (element));
1848
1849   /* decoupled elements are not added to the scheduler lists */
1850   if (GST_ELEMENT_IS_DECOUPLED (element))
1851     return;
1852
1853   ctx = g_new0 (GstOptSchedulerCtx, 1);
1854   GST_ELEMENT (element)->sched_private = ctx;
1855   ctx->flags = GST_OPT_SCHEDULER_CTX_DISABLED;
1856
1857   /* set event handler on all pads here so events work unconnected too;
1858    * in _link, it can be overruled if need be */
1859   /* FIXME: we should also do this when new pads on the element are created;
1860      but there are no hooks, so we do it again in _link */
1861   pads = gst_element_get_pad_list (element);
1862   while (pads) {
1863     GstPad *pad = GST_PAD (pads->data);
1864
1865     pads = g_list_next (pads);
1866
1867     if (!GST_IS_REAL_PAD (pad))
1868       continue;
1869     GST_RPAD_EVENTHANDLER (pad) = GST_RPAD_EVENTFUNC (pad);
1870   }
1871
1872   /* loop based elements *always* end up in their own group. It can eventually
1873    * be merged with another group when a link is made */
1874   if (element->loopfunc) {
1875     GstOptSchedulerGroup *group;
1876     GstOptSchedulerChain *chain;
1877
1878     chain = create_chain (osched);
1879
1880     group = create_group (chain, element, GST_OPT_SCHEDULER_GROUP_LOOP);
1881     group->entry = element;
1882
1883     GST_LOG ("added element \"%s\" as loop based entry",
1884         GST_ELEMENT_NAME (element));
1885   }
1886 }
1887
1888 static void
1889 gst_opt_scheduler_remove_element (GstScheduler * sched, GstElement * element)
1890 {
1891   GstOptSchedulerGroup *group;
1892
1893   GST_DEBUG_OBJECT (sched, "removing element \"%s\"",
1894       GST_OBJECT_NAME (element));
1895
1896   /* decoupled elements are not added to the scheduler lists and should therefore
1897    * not be removed */
1898   if (GST_ELEMENT_IS_DECOUPLED (element))
1899     return;
1900
1901   /* the element is guaranteed to live in it's own group/chain now */
1902   get_group (element, &group);
1903   if (group) {
1904     remove_from_group (group, element);
1905   }
1906
1907   g_free (GST_ELEMENT (element)->sched_private);
1908   GST_ELEMENT (element)->sched_private = NULL;
1909 }
1910
1911 static gboolean
1912 gst_opt_scheduler_yield (GstScheduler * sched, GstElement * element)
1913 {
1914 #ifdef USE_COTHREADS
1915   /* yield hands control to the main cothread context if the requesting 
1916    * element is the entry point of the group */
1917   GstOptSchedulerGroup *group;
1918
1919   get_group (element, &group);
1920   if (group && group->entry == element)
1921     do_cothread_switch (do_cothread_get_main (((GstOptScheduler *) sched)->
1922             context));
1923
1924   return FALSE;
1925 #else
1926   g_warning ("element %s performs a yield, please fix the element",
1927       GST_ELEMENT_NAME (element));
1928   return TRUE;
1929 #endif
1930 }
1931
1932 static gboolean
1933 gst_opt_scheduler_interrupt (GstScheduler * sched, GstElement * element)
1934 {
1935   GST_INFO ("interrupt from \"%s\"", GST_OBJECT_NAME (element));
1936
1937 #ifdef USE_COTHREADS
1938   do_cothread_switch (do_cothread_get_main (((GstOptScheduler *) sched)->
1939           context));
1940   return FALSE;
1941 #else
1942   {
1943     GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1944
1945     GST_INFO ("scheduler set interrupted state");
1946     osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
1947   }
1948   return TRUE;
1949 #endif
1950 }
1951
1952 static void
1953 gst_opt_scheduler_error (GstScheduler * sched, GstElement * element)
1954 {
1955   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1956   GstOptSchedulerGroup *group;
1957
1958   get_group (element, &group);
1959   if (group)
1960     group_error_handler (group);
1961
1962   osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1963 }
1964
1965 /* link pads, merge groups and chains */
1966 static void
1967 gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
1968     GstPad * sinkpad)
1969 {
1970   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1971   LinkType type = GST_OPT_INVALID;
1972   GstElement *src_element, *sink_element;
1973
1974   GST_INFO ("scheduling link between %s:%s and %s:%s",
1975       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1976
1977   src_element = GST_PAD_PARENT (srcpad);
1978   sink_element = GST_PAD_PARENT (sinkpad);
1979
1980   /* first we need to figure out what type of link we're dealing
1981    * with */
1982   if (src_element->loopfunc && sink_element->loopfunc)
1983     type = GST_OPT_LOOP_TO_LOOP;
1984   else {
1985     if (src_element->loopfunc) {
1986       if (GST_RPAD_CHAINFUNC (sinkpad))
1987         type = GST_OPT_LOOP_TO_CHAIN;
1988     } else if (sink_element->loopfunc) {
1989       if (GST_RPAD_GETFUNC (srcpad)) {
1990         type = GST_OPT_GET_TO_LOOP;
1991         /* this could be tricky, the get based source could 
1992          * already be part of a loop based group in another pad,
1993          * we assert on that for now */
1994         if (GST_ELEMENT_SCHED_CONTEXT (src_element) != NULL &&
1995             GST_ELEMENT_SCHED_GROUP (src_element) != NULL) {
1996           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (src_element);
1997
1998           /* if the loop based element is the entry point we're ok, if it
1999            * isn't then we have multiple loop based elements in this group */
2000           if (group->entry != sink_element) {
2001             g_error
2002                 ("internal error: cannot schedule get to loop in multi-loop based group");
2003             return;
2004           }
2005         }
2006       } else
2007         type = GST_OPT_CHAIN_TO_LOOP;
2008     } else {
2009       if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad)) {
2010         type = GST_OPT_GET_TO_CHAIN;
2011         /* the get based source could already be part of a loop 
2012          * based group in another pad, we assert on that for now */
2013         if (GST_ELEMENT_SCHED_CONTEXT (src_element) != NULL &&
2014             GST_ELEMENT_SCHED_GROUP (src_element) != NULL) {
2015           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (src_element);
2016
2017           /* if the get based element is the entry point we're ok, if it
2018            * isn't then we have a mixed loop/chain based group */
2019           if (group->entry != src_element) {
2020             g_error ("internal error: cannot schedule get to chain "
2021                 "with mixed loop/chain based group");
2022             return;
2023           }
2024         }
2025       } else
2026         type = GST_OPT_CHAIN_TO_CHAIN;
2027     }
2028   }
2029
2030   /* since we can't set event handlers on pad creation after addition, it is
2031    * best we set all of them again to the default before linking */
2032   GST_RPAD_EVENTHANDLER (srcpad) = GST_RPAD_EVENTFUNC (srcpad);
2033   GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad);
2034
2035   /* for each link type, perform specific actions */
2036   switch (type) {
2037     case GST_OPT_GET_TO_CHAIN:
2038     {
2039       GstOptSchedulerGroup *group = NULL;
2040
2041       GST_LOG ("get to chain based link");
2042
2043       /* setup get/chain handlers */
2044       GST_RPAD_GETHANDLER (srcpad) = gst_pad_call_get_function;
2045       GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function;
2046
2047       /* the two elements should be put into the same group, 
2048        * this also means that they are in the same chain automatically */
2049       group = group_elements (osched, src_element, sink_element,
2050           GST_OPT_SCHEDULER_GROUP_GET);
2051
2052       /* if there is not yet an entry in the group, select the source
2053        * element as the entry point and mark the group as a get based
2054        * group */
2055       if (!group->entry) {
2056         group->entry = src_element;
2057         group->type = GST_OPT_SCHEDULER_GROUP_GET;
2058
2059         GST_DEBUG ("setting \"%s\" as entry point of _get-based group %p",
2060             GST_ELEMENT_NAME (src_element), group);
2061
2062         setup_group_scheduler (osched, group);
2063       }
2064       break;
2065     }
2066     case GST_OPT_LOOP_TO_CHAIN:
2067     case GST_OPT_CHAIN_TO_CHAIN:
2068       GST_LOG ("loop/chain to chain based link");
2069
2070       GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function;
2071
2072       /* the two elements should be put into the same group, this also means
2073        * that they are in the same chain automatically, in case of a loop-based
2074        * src_element, there will be a group for src_element and sink_element
2075        * will be added to it. In the case a new group is created, we can't know
2076        * the type so we pass UNKNOWN as an arg */
2077       group_elements (osched, src_element, sink_element,
2078           GST_OPT_SCHEDULER_GROUP_UNKNOWN);
2079       break;
2080     case GST_OPT_GET_TO_LOOP:
2081       GST_LOG ("get to loop based link");
2082
2083       GST_RPAD_GETHANDLER (srcpad) = gst_pad_call_get_function;
2084
2085       /* the two elements should be put into the same group, this also means
2086        * that they are in the same chain automatically, sink_element is
2087        * loop-based so it already has a group where src_element will be added
2088        * to */
2089       group_elements (osched, src_element, sink_element,
2090           GST_OPT_SCHEDULER_GROUP_LOOP);
2091       break;
2092     case GST_OPT_CHAIN_TO_LOOP:
2093     case GST_OPT_LOOP_TO_LOOP:
2094     {
2095       GstOptSchedulerGroup *group1, *group2;
2096
2097       GST_LOG ("chain/loop to loop based link");
2098
2099       GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper;
2100       GST_RPAD_GETHANDLER (srcpad) = gst_opt_scheduler_get_wrapper;
2101       /* events on the srcpad have to be intercepted as we might need to
2102        * flush the buffer lists, so override the given eventfunc */
2103       GST_RPAD_EVENTHANDLER (srcpad) = gst_opt_scheduler_event_wrapper;
2104
2105       group1 = GST_ELEMENT_SCHED_GROUP (src_element);
2106       group2 = GST_ELEMENT_SCHED_GROUP (sink_element);
2107
2108       g_assert (group2 != NULL);
2109
2110       /* group2 is guaranteed to exist as it contains a loop-based element.
2111        * group1 only exists if src_element is linked to some other element */
2112       if (!group1) {
2113         /* create a new group for src_element as it cannot be merged into another group
2114          * here. we create the group in the same chain as the loop-based element. note
2115          * that creating a new group will also increment the links with other groups */
2116         GST_DEBUG ("creating new group for element %s",
2117             GST_ELEMENT_NAME (src_element));
2118         group1 =
2119             create_group (group2->chain, src_element,
2120             GST_OPT_SCHEDULER_GROUP_LOOP);
2121       } else {
2122         /* both elements are already in a group, make sure they are added to
2123          * the same chain */
2124         merge_chains (group1->chain, group2->chain);
2125         /* increment the group link counters */
2126         group_inc_link (group1, group2);
2127       }
2128       break;
2129     }
2130     case GST_OPT_INVALID:
2131       g_error ("(internal error) invalid element link, what are you doing?");
2132       break;
2133   }
2134 }
2135
2136 static void
2137 group_elements_set_visited (GstOptSchedulerGroup * group, gboolean visited)
2138 {
2139   GSList *elements;
2140
2141   for (elements = group->elements; elements; elements = g_slist_next (elements)) {
2142     GstElement *element = GST_ELEMENT (elements->data);
2143
2144     if (visited) {
2145       GST_ELEMENT_SET_VISITED (element);
2146     } else {
2147       GST_ELEMENT_UNSET_VISITED (element);
2148     }
2149   }
2150 }
2151
2152 static GList *
2153 element_get_reachables_func (GstElement * element, GstOptSchedulerGroup * group,
2154     GstPad * brokenpad)
2155 {
2156   GList *result = NULL;
2157   const GList *pads;
2158
2159   /* if no element or element not in group or been there, return NULL */
2160   if (element == NULL || !group_has_element (group, element) ||
2161       GST_ELEMENT_IS_VISITED (element))
2162     return NULL;
2163
2164   GST_ELEMENT_SET_VISITED (element);
2165
2166   result = g_list_prepend (result, element);
2167
2168   pads = gst_element_get_pad_list (element);
2169   while (pads) {
2170     GstPad *pad = GST_PAD (pads->data);
2171     GstPad *peer;
2172
2173     pads = g_list_next (pads);
2174
2175     /* we only operate on real pads and on the pad that is not broken */
2176     if (!GST_IS_REAL_PAD (pad) || pad == brokenpad)
2177       continue;
2178
2179     peer = GST_PAD_PEER (pad);
2180     if (!GST_IS_REAL_PAD (peer) || peer == brokenpad)
2181       continue;
2182
2183     if (peer) {
2184       GstElement *parent;
2185       GList *res;
2186
2187       parent = GST_PAD_PARENT (peer);
2188
2189       res = element_get_reachables_func (parent, group, brokenpad);
2190
2191       result = g_list_concat (result, res);
2192     }
2193   }
2194
2195   return result;
2196 }
2197
2198 static GList *
2199 element_get_reachables (GstElement * element, GstOptSchedulerGroup * group,
2200     GstPad * brokenpad)
2201 {
2202   GList *result;
2203
2204   /* reset visited flags */
2205   group_elements_set_visited (group, FALSE);
2206
2207   result = element_get_reachables_func (element, group, brokenpad);
2208
2209   /* and reset visited flags again */
2210   group_elements_set_visited (group, FALSE);
2211
2212   return result;
2213 }
2214
2215 /* 
2216  * checks if a target group is still reachable from the group without taking the broken
2217  * group link into account.
2218  */
2219 static gboolean
2220 group_can_reach_group (GstOptSchedulerGroup * group,
2221     GstOptSchedulerGroup * target)
2222 {
2223   gboolean reachable = FALSE;
2224   const GSList *links = group->group_links;
2225
2226   GST_LOG ("checking if group %p can reach %p", group, target);
2227
2228   /* seems like we found the target element */
2229   if (group == target) {
2230     GST_LOG ("found way to reach %p", target);
2231     return TRUE;
2232   }
2233
2234   /* if the group is marked as visited, we don't need to check here */
2235   if (GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET (group,
2236           GST_OPT_SCHEDULER_GROUP_VISITED)) {
2237     GST_LOG ("already visited %p", group);
2238     return FALSE;
2239   }
2240
2241   /* mark group as visited */
2242   GST_OPT_SCHEDULER_GROUP_SET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
2243
2244   while (links && !reachable) {
2245     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
2246     GstOptSchedulerGroup *other;
2247
2248     links = g_slist_next (links);
2249
2250     /* find other group in this link */
2251     other = OTHER_GROUP_LINK (link, group);
2252
2253     GST_LOG ("found link from %p to %p, count %d", group, other, link->count);
2254
2255     /* check if we can reach the target recursively */
2256     reachable = group_can_reach_group (other, target);
2257   }
2258   /* unset the visited flag, note that this is not optimal as we might be checking
2259    * groups several times when they are reachable with a loop. An alternative would be
2260    * to not clear the group flag at this stage but clear all flags in the chain when
2261    * all groups are checked. */
2262   GST_OPT_SCHEDULER_GROUP_UNSET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
2263
2264   GST_LOG ("leaving group %p with %s", group, (reachable ? "TRUE" : "FALSE"));
2265
2266   return reachable;
2267 }
2268
2269 /*
2270  * Go through all the pads of the given element and decrement the links that
2271  * this group has with the group of the peer element.  This function is mainly used
2272  * to update the group connections before we remove the element from the group.
2273  */
2274 static void
2275 group_dec_links_for_element (GstOptSchedulerGroup * group, GstElement * element)
2276 {
2277   GList *l;
2278   GstPad *pad;
2279   GstOptSchedulerGroup *peer_group;
2280
2281   for (l = GST_ELEMENT_PADS (element); l; l = l->next) {
2282     pad = (GstPad *) l->data;
2283     if (GST_IS_REAL_PAD (pad) && GST_PAD_PEER (pad)) {
2284       get_group (GST_PAD_PARENT (GST_PAD_PEER (pad)), &peer_group);
2285       if (peer_group && peer_group != group)
2286         group_dec_link (group, peer_group);
2287     }
2288   }
2289 }
2290
2291 /*
2292  * Go through all the pads of the given element and increment the links that
2293  * this group has with the group of the peer element.  This function is mainly used
2294  * to update the group connections before we add the element to the group.
2295  */
2296 static void
2297 group_inc_links_for_element (GstOptSchedulerGroup * group, GstElement * element)
2298 {
2299   GList *l;
2300   GstPad *pad;
2301   GstOptSchedulerGroup *peer_group;
2302
2303   GST_DEBUG ("group %p, element %s ", group, gst_element_get_name (element));
2304
2305   for (l = GST_ELEMENT_PADS (element); l; l = l->next) {
2306     pad = (GstPad *) l->data;
2307     if (GST_IS_REAL_PAD (pad) && GST_PAD_PEER (pad)) {
2308       get_group (GST_PAD_PARENT (GST_PAD_PEER (pad)), &peer_group);
2309       if (peer_group && peer_group != group)
2310         if (GST_PAD_DIRECTION (pad) == GST_PAD_SRC)
2311           group_inc_link (group, peer_group);
2312         else
2313           group_inc_link (peer_group, group);
2314     }
2315   }
2316 }
2317
2318 static void
2319 debug_element (GstElement * element, GstOptScheduler * osched)
2320 {
2321   GST_LOG ("element %s", gst_element_get_name (element));
2322 }
2323
2324 /* move this group in the chain of the groups it has links with */
2325 static void
2326 rechain_group (GstOptSchedulerGroup * group)
2327 {
2328   GstOptSchedulerChain *chain = NULL;
2329   GSList *links;
2330
2331   GST_LOG ("checking if this group needs rechaining");
2332
2333   /* follow all links */
2334   for (links = group->group_links; links; links = g_slist_next (links)) {
2335     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
2336     GstOptSchedulerGroup *other;
2337
2338     other = OTHER_GROUP_LINK (link, group);
2339     GST_LOG ("found link with other group %p with chain %p", other,
2340         other->chain);
2341
2342     /* first time, take chain */
2343     if (chain == NULL) {
2344       chain = other->chain;
2345     }
2346     /* second time, chain should be the same */
2347     else if (other->chain != chain) {
2348       g_warning ("(internal error): chain inconsistency");
2349     }
2350   }
2351   if (!chain) {
2352     GST_LOG ("no new chain found, not rechaining");
2353   } else if (chain != group->chain) {
2354     GST_LOG ("need to move group %p to chain %p", group, chain);
2355     /* remove from old chain */
2356     remove_from_chain (group->chain, group);
2357     /* and move to new chain */
2358     add_to_chain (chain, group);
2359   } else {
2360     GST_LOG ("group %p is in correct chain %p", group, chain);
2361   }
2362 }
2363
2364 /* migrate the element and all connected elements to a new group without looking at
2365  * the brokenpad */
2366 static GstOptSchedulerGroup *
2367 group_migrate_connected (GstOptScheduler * osched, GstElement * element,
2368     GstOptSchedulerGroup * group, GstPad * brokenpad)
2369 {
2370   GList *connected, *c;
2371   GstOptSchedulerGroup *new_group = NULL, *tst;
2372   GstOptSchedulerChain *chain;
2373   gint len;
2374
2375   if (GST_ELEMENT_IS_DECOUPLED (element)) {
2376     GST_LOG ("element is decoupled and thus not in the group");
2377     /* the element is decoupled and is therefore not in the group */
2378     return NULL;
2379   }
2380
2381   get_group (element, &tst);
2382   if (tst == NULL) {
2383     GST_LOG ("element has no group, not interesting");
2384     return NULL;
2385   }
2386
2387   GST_LOG ("migrate connected elements to new group");
2388   connected = element_get_reachables (element, group, brokenpad);
2389   GST_LOG ("elements to move to new group:");
2390   g_list_foreach (connected, (GFunc) debug_element, NULL);
2391
2392   len = g_list_length (connected);
2393
2394   if (len == 0) {
2395     g_warning ("(internal error) found lost element %s",
2396         gst_element_get_name (element));
2397     return NULL;
2398   } else if (len == 1) {
2399     group = remove_from_group (group, GST_ELEMENT (connected->data));
2400     GST_LOG
2401         ("not migrating to new group as the group would only contain 1 element");
2402     g_list_free (connected);
2403     GST_LOG ("new group is old group now");
2404     new_group = group;
2405   } else {
2406     /* we create a new chain to hold the new group */
2407     chain = create_chain (osched);
2408
2409     for (c = connected; c; c = g_list_next (c)) {
2410       GstElement *element = GST_ELEMENT (c->data);
2411
2412       group = remove_from_group (group, element);
2413       if (new_group == NULL) {
2414         new_group =
2415             create_group (chain, element, GST_OPT_SCHEDULER_GROUP_UNKNOWN);
2416       } else {
2417         add_to_group (new_group, element, TRUE);
2418       }
2419     }
2420     g_list_free (connected);
2421
2422     /* remove last element from the group if any. Make sure not to remove
2423      * the loop based entry point of a group as this always needs one group */
2424     if (group != NULL) {
2425       if (group_num_elements (group) == 1 &&
2426           group->type != GST_OPT_SCHEDULER_GROUP_LOOP) {
2427         GST_LOG ("removing last element from old group");
2428         group = remove_from_group (group, GST_ELEMENT (group->elements->data));
2429       }
2430     }
2431   }
2432
2433   if (new_group != NULL) {
2434     if (group_num_elements (new_group) == 1 &&
2435         new_group->type != GST_OPT_SCHEDULER_GROUP_LOOP) {
2436       GST_LOG ("removing last element from new group");
2437       new_group =
2438           remove_from_group (new_group,
2439           GST_ELEMENT (new_group->elements->data));
2440       return NULL;
2441     }
2442     /* at this point the new group lives in its own chain but might
2443      * have to be merged with another chain, this happens when the new
2444      * group has a link with another group in another chain */
2445     rechain_group (new_group);
2446   }
2447
2448
2449   return new_group;
2450 }
2451
2452 static void
2453 gst_opt_scheduler_pad_unlink (GstScheduler * sched,
2454     GstPad * srcpad, GstPad * sinkpad)
2455 {
2456   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2457   GstElement *src_element, *sink_element;
2458   GstOptSchedulerGroup *group1, *group2;
2459
2460   GST_INFO ("unscheduling link between %s:%s and %s:%s",
2461       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
2462
2463   src_element = GST_PAD_PARENT (srcpad);
2464   sink_element = GST_PAD_PARENT (sinkpad);
2465
2466   get_group (src_element, &group1);
2467   get_group (sink_element, &group2);
2468
2469   /* for decoupled elements (that are never put into a group) we use the
2470    * group of the peer element for the remainder of the algorithm */
2471   if (GST_ELEMENT_IS_DECOUPLED (src_element)) {
2472     group1 = group2;
2473   }
2474   if (GST_ELEMENT_IS_DECOUPLED (sink_element)) {
2475     group2 = group1;
2476   }
2477
2478   /* if one the elements has no group (anymore) we don't really care 
2479    * about the link */
2480   if (!group1 || !group2) {
2481     GST_LOG
2482         ("one (or both) of the elements is not in a group, not interesting");
2483     return;
2484   }
2485
2486   /* easy part, groups are different */
2487   if (group1 != group2) {
2488     gboolean zero;
2489
2490     GST_LOG ("elements are in different groups");
2491
2492     /* we can remove the links between the groups now */
2493     zero = group_dec_link (group1, group2);
2494
2495     /* if the groups are not directly connected anymore, we have to perform a
2496      * recursive check to see if they are really unlinked */
2497     if (zero) {
2498       gboolean still_link;
2499       GstOptSchedulerChain *chain;
2500
2501       /* see if group1 and group2 are still connected in any indirect way */
2502       still_link = group_can_reach_group (group1, group2);
2503
2504       GST_DEBUG ("group %p %s reach group %p", group1,
2505           (still_link ? "can" : "can't"), group2);
2506       if (!still_link) {
2507         /* groups are really disconnected, migrate one group to a new chain */
2508         chain = create_chain (osched);
2509         chain_recursively_migrate_group (chain, group1);
2510
2511         GST_DEBUG ("migrated group %p to new chain %p", group1, chain);
2512       }
2513     } else {
2514       GST_DEBUG ("group %p still has direct link with group %p", group1,
2515           group2);
2516     }
2517   }
2518   /* hard part, groups are equal */
2519   else {
2520     GstOptSchedulerGroup *group;
2521
2522     /* since group1 == group2, it doesn't matter which group we take */
2523     group = group1;
2524
2525     GST_LOG ("elements are in the same group %p", group);
2526
2527     if (group->entry == NULL) {
2528       /* it doesn't really matter, we just have to make sure that both 
2529        * elements end up in another group if they are not connected */
2530       GST_LOG ("group %p has no entry, moving source element to new group",
2531           group);
2532       group_migrate_connected (osched, src_element, group, srcpad);
2533     } else {
2534       GList *reachables;
2535
2536       GST_LOG ("group %p has entry %p", group, group->entry);
2537
2538       /* get of a list of all elements that are still managed by the old
2539        * entry element */
2540       reachables = element_get_reachables (group->entry, group, srcpad);
2541       GST_LOG ("elements still reachable from the entry:");
2542       g_list_foreach (reachables, (GFunc) debug_element, sched);
2543
2544       /* if the source is reachable from the entry, we can leave it in the group */
2545       if (g_list_find (reachables, src_element)) {
2546         GST_LOG
2547             ("source element still reachable from the entry, leaving in group");
2548       } else {
2549         GST_LOG
2550             ("source element not reachable from the entry, moving to new group");
2551         group_migrate_connected (osched, src_element, group, srcpad);
2552       }
2553
2554       /* if the sink is reachable from the entry, we can leave it in the group */
2555       if (g_list_find (reachables, sink_element)) {
2556         GST_LOG
2557             ("sink element still reachable from the entry, leaving in group");
2558       } else {
2559         GST_LOG
2560             ("sink element not reachable from the entry, moving to new group");
2561         group_migrate_connected (osched, sink_element, group, srcpad);
2562       }
2563       g_list_free (reachables);
2564     }
2565     /* at this point the group can be freed and gone, so don't touch */
2566   }
2567 }
2568
2569 /* a scheduler iteration is done by looping and scheduling the active chains */
2570 static GstSchedulerState
2571 gst_opt_scheduler_iterate (GstScheduler * sched)
2572 {
2573   GstSchedulerState state = GST_SCHEDULER_STATE_STOPPED;
2574   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2575   gint iterations = osched->iterations;
2576
2577   osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2578
2579   GST_DEBUG_OBJECT (sched, "iterating");
2580
2581   while (iterations) {
2582     gboolean scheduled = FALSE;
2583     GSList *chains;
2584
2585     /* we have to schedule each of the scheduler chains now */
2586     chains = osched->chains;
2587     while (chains) {
2588       GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2589
2590       ref_chain (chain);
2591       /* if the chain is not disabled, schedule it */
2592       if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
2593         GST_LOG ("scheduling chain %p", chain);
2594         schedule_chain (chain);
2595         scheduled = TRUE;
2596         GST_LOG ("scheduled chain %p", chain);
2597       } else {
2598         GST_LOG ("not scheduling disabled chain %p", chain);
2599       }
2600
2601       /* don't schedule any more chains when in error */
2602       if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2603         GST_ERROR_OBJECT (sched, "in error state");
2604         break;
2605       } else if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
2606         GST_DEBUG_OBJECT (osched, "got interrupted, continue with next chain");
2607         osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2608       }
2609
2610       chains = g_slist_next (chains);
2611       unref_chain (chain);
2612     }
2613
2614     /* at this point it's possible that the scheduler state is
2615      * in error, we then return an error */
2616     if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2617       state = GST_SCHEDULER_STATE_ERROR;
2618       break;
2619     } else {
2620       /* if chains were scheduled, return our current state */
2621       if (scheduled)
2622         state = GST_SCHEDULER_STATE (sched);
2623       /* if no chains were scheduled, we say we are stopped */
2624       else {
2625         state = GST_SCHEDULER_STATE_STOPPED;
2626         break;
2627       }
2628     }
2629     if (iterations > 0)
2630       iterations--;
2631   }
2632
2633   return state;
2634 }
2635
2636
2637 static void
2638 gst_opt_scheduler_show (GstScheduler * sched)
2639 {
2640   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2641   GSList *chains;
2642
2643   g_print ("iterations:    %d\n", osched->iterations);
2644   g_print ("max recursion: %d\n", osched->max_recursion);
2645
2646   chains = osched->chains;
2647   while (chains) {
2648     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2649     GSList *groups = chain->groups;
2650
2651     chains = g_slist_next (chains);
2652
2653     g_print ("+- chain %p: refcount %d, %d groups, %d enabled, flags %d\n",
2654         chain, chain->refcount, chain->num_groups, chain->num_enabled,
2655         chain->flags);
2656
2657     while (groups) {
2658       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
2659       GSList *elements = group->elements;
2660       GSList *group_links = group->group_links;
2661
2662       groups = g_slist_next (groups);
2663
2664       g_print
2665           (" +- group %p: refcount %d, %d elements, %d enabled, flags %d, entry %s, %s\n",
2666           group, group->refcount, group->num_elements, group->num_enabled,
2667           group->flags,
2668           (group->entry ? GST_ELEMENT_NAME (group->entry) : "(none)"),
2669           (group->type ==
2670               GST_OPT_SCHEDULER_GROUP_GET ? "get-based" : "loop-based"));
2671
2672       while (elements) {
2673         GstElement *element = (GstElement *) elements->data;
2674
2675         elements = g_slist_next (elements);
2676
2677         g_print ("  +- element %s\n", GST_ELEMENT_NAME (element));
2678       }
2679       while (group_links) {
2680         GstOptSchedulerGroupLink *link =
2681             (GstOptSchedulerGroupLink *) group_links->data;
2682
2683         group_links = g_slist_next (group_links);
2684
2685         g_print ("group link %p between %p and %p, count %d\n",
2686             link, link->src, link->sink, link->count);
2687       }
2688     }
2689   }
2690 }
2691
2692 static void
2693 gst_opt_scheduler_get_property (GObject * object, guint prop_id,
2694     GValue * value, GParamSpec * pspec)
2695 {
2696   GstOptScheduler *osched;
2697
2698   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2699
2700   osched = GST_OPT_SCHEDULER (object);
2701
2702   switch (prop_id) {
2703     case ARG_ITERATIONS:
2704       g_value_set_int (value, osched->iterations);
2705       break;
2706     case ARG_MAX_RECURSION:
2707       g_value_set_int (value, osched->max_recursion);
2708       break;
2709     default:
2710       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2711       break;
2712   }
2713 }
2714
2715 static void
2716 gst_opt_scheduler_set_property (GObject * object, guint prop_id,
2717     const GValue * value, GParamSpec * pspec)
2718 {
2719   GstOptScheduler *osched;
2720
2721   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2722
2723   osched = GST_OPT_SCHEDULER (object);
2724
2725   switch (prop_id) {
2726     case ARG_ITERATIONS:
2727       osched->iterations = g_value_get_int (value);
2728       break;
2729     case ARG_MAX_RECURSION:
2730       osched->max_recursion = g_value_get_int (value);
2731       break;
2732     default:
2733       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2734       break;
2735   }
2736 }