fix SIGBUS on opt, #145338
[platform/upstream/gstreamer.git] / gst / schedulers / gstoptimalscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #include <gst/gst.h>
28
29 GST_DEBUG_CATEGORY_STATIC (debug_scheduler);
30 #define GST_CAT_DEFAULT debug_scheduler
31
32 #ifdef USE_COTHREADS
33 # include "cothreads_compat.h"
34 #else
35 # define COTHREADS_NAME_CAPITAL ""
36 # define COTHREADS_NAME         ""
37 #endif
38
39 #define GST_ELEMENT_SCHED_CONTEXT(elem)         ((GstOptSchedulerCtx*) (GST_ELEMENT (elem)->sched_private))
40 #define GST_ELEMENT_SCHED_GROUP(elem)           (GST_ELEMENT_SCHED_CONTEXT (elem)->group)
41 /* need this first macro to not run into lvalue casts */
42 #define GST_PAD_BUFPEN(pad)                     (GST_REAL_PAD(pad)->sched_private)
43 #define GST_PAD_BUFLIST(pad)                    ((GList*) GST_PAD_BUFPEN(pad))
44
45 #define GST_ELEMENT_COTHREAD_STOPPING                   GST_ELEMENT_SCHEDULER_PRIVATE1
46 #define GST_ELEMENT_IS_COTHREAD_STOPPING(element)       GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
47 #define GST_ELEMENT_INTERRUPTED                         GST_ELEMENT_SCHEDULER_PRIVATE2
48 #define GST_ELEMENT_IS_INTERRUPTED(element)             GST_FLAG_IS_SET((element), GST_ELEMENT_INTERRUPTED)
49
50 typedef struct _GstOptScheduler GstOptScheduler;
51 typedef struct _GstOptSchedulerClass GstOptSchedulerClass;
52
53 #define GST_TYPE_OPT_SCHEDULER \
54   (gst_opt_scheduler_get_type())
55 #define GST_OPT_SCHEDULER(obj) \
56   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_OPT_SCHEDULER,GstOptScheduler))
57 #define GST_OPT_SCHEDULER_CLASS(klass) \
58   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_OPT_SCHEDULER,GstOptSchedulerClass))
59 #define GST_IS_OPT_SCHEDULER(obj) \
60   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OPT_SCHEDULER))
61 #define GST_IS_OPT_SCHEDULER_CLASS(obj) \
62   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OPT_SCHEDULER))
63
64 typedef enum
65 {
66   GST_OPT_SCHEDULER_STATE_NONE,
67   GST_OPT_SCHEDULER_STATE_STOPPED,
68   GST_OPT_SCHEDULER_STATE_ERROR,
69   GST_OPT_SCHEDULER_STATE_RUNNING,
70   GST_OPT_SCHEDULER_STATE_INTERRUPTED
71 }
72 GstOptSchedulerState;
73
74 struct _GstOptScheduler
75 {
76   GstScheduler parent;
77
78   GstOptSchedulerState state;
79
80 #ifdef USE_COTHREADS
81   cothread_context *context;
82 #endif
83   gint iterations;
84
85   GSList *elements;
86   GSList *chains;
87
88   GList *runqueue;
89   gint recursion;
90
91   gint max_recursion;
92 };
93
94 struct _GstOptSchedulerClass
95 {
96   GstSchedulerClass parent_class;
97 };
98
99 static GType _gst_opt_scheduler_type = 0;
100
101 typedef enum
102 {
103   GST_OPT_SCHEDULER_CHAIN_DIRTY = (1 << 1),
104   GST_OPT_SCHEDULER_CHAIN_DISABLED = (1 << 2),
105   GST_OPT_SCHEDULER_CHAIN_RUNNING = (1 << 3)
106 }
107 GstOptSchedulerChainFlags;
108
109 #define GST_OPT_SCHEDULER_CHAIN_SET_DIRTY(chain)        ((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DIRTY)
110 #define GST_OPT_SCHEDULER_CHAIN_SET_CLEAN(chain)        ((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DIRTY)
111 #define GST_OPT_SCHEDULER_CHAIN_IS_DIRTY(chain)         ((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DIRTY)
112
113 #define GST_OPT_SCHEDULER_CHAIN_DISABLE(chain)          ((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DISABLED)
114 #define GST_OPT_SCHEDULER_CHAIN_ENABLE(chain)           ((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DISABLED)
115 #define GST_OPT_SCHEDULER_CHAIN_IS_DISABLED(chain)      ((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DISABLED)
116
117 typedef struct _GstOptSchedulerChain GstOptSchedulerChain;
118
119 struct _GstOptSchedulerChain
120 {
121   gint refcount;
122
123   GstOptScheduler *sched;
124
125   GstOptSchedulerChainFlags flags;
126
127   GSList *groups;               /* the groups in this chain */
128   gint num_groups;
129   gint num_enabled;
130 };
131
132 /* 
133  * elements that are scheduled in one cothread 
134  */
135 typedef enum
136 {
137   GST_OPT_SCHEDULER_GROUP_DIRTY = (1 << 1),     /* this group has been modified */
138   GST_OPT_SCHEDULER_GROUP_COTHREAD_STOPPING = (1 << 2), /* the group's cothread stops after one iteration */
139   GST_OPT_SCHEDULER_GROUP_DISABLED = (1 << 3),  /* this group is disabled */
140   GST_OPT_SCHEDULER_GROUP_RUNNING = (1 << 4),   /* this group is running */
141   GST_OPT_SCHEDULER_GROUP_SCHEDULABLE = (1 << 5),       /* this group is schedulable */
142   GST_OPT_SCHEDULER_GROUP_VISITED = (1 << 6)    /* this group is visited when finding links */
143 }
144 GstOptSchedulerGroupFlags;
145
146 typedef enum
147 {
148   GST_OPT_SCHEDULER_GROUP_UNKNOWN = 3,
149   GST_OPT_SCHEDULER_GROUP_GET = 1,
150   GST_OPT_SCHEDULER_GROUP_LOOP = 2
151 }
152 GstOptSchedulerGroupType;
153
154 #define GST_OPT_SCHEDULER_GROUP_SET_FLAG(group,flag)    ((group)->flags |= (flag))
155 #define GST_OPT_SCHEDULER_GROUP_UNSET_FLAG(group,flag)  ((group)->flags &= ~(flag))
156 #define GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET(group,flag) ((group)->flags & (flag))
157
158 #define GST_OPT_SCHEDULER_GROUP_DISABLE(group)          ((group)->flags |= GST_OPT_SCHEDULER_GROUP_DISABLED)
159 #define GST_OPT_SCHEDULER_GROUP_ENABLE(group)           ((group)->flags &= ~GST_OPT_SCHEDULER_GROUP_DISABLED)
160 #define GST_OPT_SCHEDULER_GROUP_IS_ENABLED(group)       (!((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED))
161 #define GST_OPT_SCHEDULER_GROUP_IS_DISABLED(group)      ((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED)
162
163
164 typedef struct _GstOptSchedulerGroup GstOptSchedulerGroup;
165 typedef struct _GstOptSchedulerGroupLink GstOptSchedulerGroupLink;
166
167 /* used to keep track of links with other groups */
168 struct _GstOptSchedulerGroupLink
169 {
170   GstOptSchedulerGroup *src;    /* the group we are linked with */
171   GstOptSchedulerGroup *sink;   /* the group we are linked with */
172   gint count;                   /* the number of links with the group */
173 };
174
175 #define IS_GROUP_LINK(link, srcg, sinkg)        ((link->src == srcg && link->sink == sinkg) || \
176                                                  (link->sink == srcg && link->src == sinkg))
177 #define OTHER_GROUP_LINK(link, group)           (link->src == group ? link->sink : link->src)
178
179 typedef int (*GroupScheduleFunction) (int argc, char *argv[]);
180
181 struct _GstOptSchedulerGroup
182 {
183   GstOptSchedulerChain *chain;  /* the chain this group belongs to */
184   GstOptSchedulerGroupFlags flags;      /* flags for this group */
185   GstOptSchedulerGroupType type;        /* flags for this group */
186
187   gint refcount;
188
189   GSList *elements;             /* elements of this group */
190   gint num_elements;
191   gint num_enabled;
192   GstElement *entry;            /* the group's entry point */
193
194   GSList *group_links;          /* other groups that are linked with this group */
195
196 #ifdef USE_COTHREADS
197   cothread *cothread;           /* the cothread of this group */
198 #else
199   GroupScheduleFunction schedulefunc;
200 #endif
201   int argc;
202   char **argv;
203 };
204
205
206 /* 
207  * A group is a set of elements through which data can flow without switching
208  * cothreads or without invoking the scheduler's run queue.
209  */
210 static GstOptSchedulerGroup *ref_group (GstOptSchedulerGroup * group);
211 static GstOptSchedulerGroup *unref_group (GstOptSchedulerGroup * group);
212 static GstOptSchedulerGroup *create_group (GstOptSchedulerChain * chain,
213     GstElement * element, GstOptSchedulerGroupType type);
214 static void destroy_group (GstOptSchedulerGroup * group);
215 static GstOptSchedulerGroup *add_to_group (GstOptSchedulerGroup * group,
216     GstElement * element, gboolean with_links);
217 static GstOptSchedulerGroup *remove_from_group (GstOptSchedulerGroup * group,
218     GstElement * element);
219 static void group_dec_links_for_element (GstOptSchedulerGroup * group,
220     GstElement * element);
221 static void group_inc_links_for_element (GstOptSchedulerGroup * group,
222     GstElement * element);
223 static GstOptSchedulerGroup *merge_groups (GstOptSchedulerGroup * group1,
224     GstOptSchedulerGroup * group2);
225 static void setup_group_scheduler (GstOptScheduler * osched,
226     GstOptSchedulerGroup * group);
227 static void destroy_group_scheduler (GstOptSchedulerGroup * group);
228 static void group_error_handler (GstOptSchedulerGroup * group);
229 static void group_element_set_enabled (GstOptSchedulerGroup * group,
230     GstElement * element, gboolean enabled);
231 static gboolean schedule_group (GstOptSchedulerGroup * group);
232
233
234 /* 
235  * A chain is a set of groups that are linked to each other.
236  */
237 static void destroy_chain (GstOptSchedulerChain * chain);
238 static GstOptSchedulerChain *create_chain (GstOptScheduler * osched);
239 static GstOptSchedulerChain *ref_chain (GstOptSchedulerChain * chain);
240 static GstOptSchedulerChain *unref_chain (GstOptSchedulerChain * chain);
241 static GstOptSchedulerChain *add_to_chain (GstOptSchedulerChain * chain,
242     GstOptSchedulerGroup * group);
243 static GstOptSchedulerChain *remove_from_chain (GstOptSchedulerChain * chain,
244     GstOptSchedulerGroup * group);
245 static GstOptSchedulerChain *merge_chains (GstOptSchedulerChain * chain1,
246     GstOptSchedulerChain * chain2);
247 static void chain_recursively_migrate_group (GstOptSchedulerChain * chain,
248     GstOptSchedulerGroup * group);
249 static void chain_group_set_enabled (GstOptSchedulerChain * chain,
250     GstOptSchedulerGroup * group, gboolean enabled);
251 static void schedule_chain (GstOptSchedulerChain * chain);
252
253
254 /*
255  * The schedule functions are the entry points for cothreads, or called directly
256  * by gst_opt_scheduler_schedule_run_queue
257  */
258 static int get_group_schedule_function (int argc, char *argv[]);
259 static int loop_group_schedule_function (int argc, char *argv[]);
260 static int unknown_group_schedule_function (int argc, char *argv[]);
261
262
263 /*
264  * These wrappers are set on the pads as the chain handler (what happens when
265  * gst_pad_push is called) or get handler (for gst_pad_pull).
266  */
267 static void gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data);
268 static GstData *gst_opt_scheduler_get_wrapper (GstPad * srcpad);
269
270
271 /*
272  * Without cothreads, gst_pad_push or gst_pad_pull on a loop-based group will
273  * just queue the peer element on a list. We need to actually run the queue
274  * instead of relying on cothreads to do the switch for us.
275  */
276 #ifndef USE_COTHREADS
277 static void gst_opt_scheduler_schedule_run_queue (GstOptScheduler * osched);
278 #endif
279
280
281 /* 
282  * Scheduler private data for an element 
283  */
284 typedef struct _GstOptSchedulerCtx GstOptSchedulerCtx;
285
286 typedef enum
287 {
288   GST_OPT_SCHEDULER_CTX_DISABLED = (1 << 1)     /* the element is disabled */
289 }
290 GstOptSchedulerCtxFlags;
291
292 struct _GstOptSchedulerCtx
293 {
294   GstOptSchedulerGroup *group;  /* the group this element belongs to */
295
296   GstOptSchedulerCtxFlags flags;        /* flags for this element */
297 };
298
299
300 /*
301  * Implementation of GstScheduler
302  */
303 enum
304 {
305   ARG_0,
306   ARG_ITERATIONS,
307   ARG_MAX_RECURSION
308 };
309
310 static void gst_opt_scheduler_class_init (GstOptSchedulerClass * klass);
311 static void gst_opt_scheduler_init (GstOptScheduler * scheduler);
312
313 static void gst_opt_scheduler_set_property (GObject * object, guint prop_id,
314     const GValue * value, GParamSpec * pspec);
315 static void gst_opt_scheduler_get_property (GObject * object, guint prop_id,
316     GValue * value, GParamSpec * pspec);
317
318 static void gst_opt_scheduler_dispose (GObject * object);
319
320 static void gst_opt_scheduler_setup (GstScheduler * sched);
321 static void gst_opt_scheduler_reset (GstScheduler * sched);
322 static void gst_opt_scheduler_add_element (GstScheduler * sched,
323     GstElement * element);
324 static void gst_opt_scheduler_remove_element (GstScheduler * sched,
325     GstElement * element);
326 static GstElementStateReturn gst_opt_scheduler_state_transition (GstScheduler *
327     sched, GstElement * element, gint transition);
328 static void gst_opt_scheduler_scheduling_change (GstScheduler * sched,
329     GstElement * element);
330 static gboolean gst_opt_scheduler_yield (GstScheduler * sched,
331     GstElement * element);
332 static gboolean gst_opt_scheduler_interrupt (GstScheduler * sched,
333     GstElement * element);
334 static void gst_opt_scheduler_error (GstScheduler * sched,
335     GstElement * element);
336 static void gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
337     GstPad * sinkpad);
338 static void gst_opt_scheduler_pad_unlink (GstScheduler * sched, GstPad * srcpad,
339     GstPad * sinkpad);
340 static GstSchedulerState gst_opt_scheduler_iterate (GstScheduler * sched);
341
342 static void gst_opt_scheduler_show (GstScheduler * sched);
343
344 static GstSchedulerClass *parent_class = NULL;
345
346 static GType
347 gst_opt_scheduler_get_type (void)
348 {
349   if (!_gst_opt_scheduler_type) {
350     static const GTypeInfo scheduler_info = {
351       sizeof (GstOptSchedulerClass),
352       NULL,
353       NULL,
354       (GClassInitFunc) gst_opt_scheduler_class_init,
355       NULL,
356       NULL,
357       sizeof (GstOptScheduler),
358       0,
359       (GInstanceInitFunc) gst_opt_scheduler_init,
360       NULL
361     };
362
363     _gst_opt_scheduler_type = g_type_register_static (GST_TYPE_SCHEDULER,
364         "GstOpt" COTHREADS_NAME_CAPITAL "Scheduler", &scheduler_info, 0);
365   }
366   return _gst_opt_scheduler_type;
367 }
368
369 static void
370 gst_opt_scheduler_class_init (GstOptSchedulerClass * klass)
371 {
372   GObjectClass *gobject_class;
373   GstObjectClass *gstobject_class;
374   GstSchedulerClass *gstscheduler_class;
375
376   gobject_class = (GObjectClass *) klass;
377   gstobject_class = (GstObjectClass *) klass;
378   gstscheduler_class = (GstSchedulerClass *) klass;
379
380   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
381
382   gobject_class->set_property =
383       GST_DEBUG_FUNCPTR (gst_opt_scheduler_set_property);
384   gobject_class->get_property =
385       GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property);
386   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose);
387
388   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ITERATIONS,
389       g_param_spec_int ("iterations", "Iterations",
390           "Number of groups to schedule in one iteration (-1 == until EOS/error)",
391           -1, G_MAXINT, 1, G_PARAM_READWRITE));
392 #ifndef USE_COTHREADS
393   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_RECURSION,
394       g_param_spec_int ("max_recursion", "Max recursion",
395           "Maximum number of recursions", 1, G_MAXINT, 100, G_PARAM_READWRITE));
396 #endif
397
398   gstscheduler_class->setup = GST_DEBUG_FUNCPTR (gst_opt_scheduler_setup);
399   gstscheduler_class->reset = GST_DEBUG_FUNCPTR (gst_opt_scheduler_reset);
400   gstscheduler_class->add_element =
401       GST_DEBUG_FUNCPTR (gst_opt_scheduler_add_element);
402   gstscheduler_class->remove_element =
403       GST_DEBUG_FUNCPTR (gst_opt_scheduler_remove_element);
404   gstscheduler_class->state_transition =
405       GST_DEBUG_FUNCPTR (gst_opt_scheduler_state_transition);
406   gstscheduler_class->scheduling_change =
407       GST_DEBUG_FUNCPTR (gst_opt_scheduler_scheduling_change);
408   gstscheduler_class->yield = GST_DEBUG_FUNCPTR (gst_opt_scheduler_yield);
409   gstscheduler_class->interrupt =
410       GST_DEBUG_FUNCPTR (gst_opt_scheduler_interrupt);
411   gstscheduler_class->error = GST_DEBUG_FUNCPTR (gst_opt_scheduler_error);
412   gstscheduler_class->pad_link = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_link);
413   gstscheduler_class->pad_unlink =
414       GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_unlink);
415   gstscheduler_class->clock_wait = NULL;
416   gstscheduler_class->iterate = GST_DEBUG_FUNCPTR (gst_opt_scheduler_iterate);
417   gstscheduler_class->show = GST_DEBUG_FUNCPTR (gst_opt_scheduler_show);
418
419 #ifdef USE_COTHREADS
420   do_cothreads_init (NULL);
421 #endif
422 }
423
424 static void
425 gst_opt_scheduler_init (GstOptScheduler * scheduler)
426 {
427   scheduler->elements = NULL;
428   scheduler->iterations = 1;
429   scheduler->max_recursion = 100;
430 }
431
432 static void
433 gst_opt_scheduler_dispose (GObject * object)
434 {
435   G_OBJECT_CLASS (parent_class)->dispose (object);
436 }
437
438 static gboolean
439 plugin_init (GstPlugin * plugin)
440 {
441   GstSchedulerFactory *factory;
442
443   GST_DEBUG_CATEGORY_INIT (debug_scheduler, "scheduler", 0,
444       "optimal scheduler");
445
446 #ifdef USE_COTHREADS
447   factory = gst_scheduler_factory_new ("opt" COTHREADS_NAME,
448       "An optimal scheduler using " COTHREADS_NAME " cothreads",
449       gst_opt_scheduler_get_type ());
450 #else
451   factory = gst_scheduler_factory_new ("opt",
452       "An optimal scheduler using no cothreads", gst_opt_scheduler_get_type ());
453 #endif
454
455   if (factory != NULL) {
456     gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
457   } else {
458     g_warning ("could not register scheduler: optimal");
459   }
460   return TRUE;
461 }
462
463 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
464     GST_VERSION_MINOR,
465     "gstopt" COTHREADS_NAME "scheduler",
466     "An optimal scheduler using " COTHREADS_NAME " cothreads",
467     plugin_init, VERSION, GST_LICENSE, GST_PACKAGE, GST_ORIGIN);
468
469
470 static GstOptSchedulerChain *
471 ref_chain (GstOptSchedulerChain * chain)
472 {
473   GST_LOG ("ref chain %p %d->%d", chain, chain->refcount, chain->refcount + 1);
474   chain->refcount++;
475
476   return chain;
477 }
478
479 static GstOptSchedulerChain *
480 unref_chain (GstOptSchedulerChain * chain)
481 {
482   GST_LOG ("unref chain %p %d->%d", chain,
483       chain->refcount, chain->refcount - 1);
484
485   if (--chain->refcount == 0) {
486     destroy_chain (chain);
487     chain = NULL;
488   }
489
490   return chain;
491 }
492
493 static GstOptSchedulerChain *
494 create_chain (GstOptScheduler * osched)
495 {
496   GstOptSchedulerChain *chain;
497
498   chain = g_new0 (GstOptSchedulerChain, 1);
499   chain->sched = osched;
500   chain->refcount = 1;
501   chain->flags = GST_OPT_SCHEDULER_CHAIN_DISABLED;
502
503   gst_object_ref (GST_OBJECT (osched));
504   osched->chains = g_slist_prepend (osched->chains, chain);
505
506   GST_LOG ("new chain %p", chain);
507
508   return chain;
509 }
510
511 static void
512 destroy_chain (GstOptSchedulerChain * chain)
513 {
514   GstOptScheduler *osched;
515
516   GST_LOG ("destroy chain %p", chain);
517
518   g_assert (chain->num_groups == 0);
519   g_assert (chain->groups == NULL);
520
521   osched = chain->sched;
522   osched->chains = g_slist_remove (osched->chains, chain);
523
524   gst_object_unref (GST_OBJECT (osched));
525
526   g_free (chain);
527 }
528
529 static GstOptSchedulerChain *
530 add_to_chain (GstOptSchedulerChain * chain, GstOptSchedulerGroup * group)
531 {
532   gboolean enabled;
533
534   GST_LOG ("adding group %p to chain %p", group, chain);
535
536   g_assert (group->chain == NULL);
537
538   group = ref_group (group);
539
540   group->chain = ref_chain (chain);
541   chain->groups = g_slist_prepend (chain->groups, group);
542   chain->num_groups++;
543
544   enabled = GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group);
545
546   if (enabled) {
547     /* we can now setup the scheduling of the group */
548     setup_group_scheduler (chain->sched, group);
549
550     chain->num_enabled++;
551     if (chain->num_enabled == chain->num_groups) {
552       GST_LOG ("enabling chain %p after adding of enabled group", chain);
553       GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
554     }
555   }
556
557   /* queue a resort of the group list, which determines which group will be run
558    * first. */
559   GST_OPT_SCHEDULER_CHAIN_SET_DIRTY (chain);
560
561   return chain;
562 }
563
564 static GstOptSchedulerChain *
565 remove_from_chain (GstOptSchedulerChain * chain, GstOptSchedulerGroup * group)
566 {
567   gboolean enabled;
568
569   GST_LOG ("removing group %p from chain %p", group, chain);
570
571   if (!chain)
572     return NULL;
573
574   g_assert (group);
575   g_assert (group->chain == chain);
576
577   enabled = GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group);
578
579   group->chain = NULL;
580   chain->groups = g_slist_remove (chain->groups, group);
581   chain->num_groups--;
582   unref_group (group);
583
584   if (chain->num_groups == 0)
585     chain = unref_chain (chain);
586   else {
587     /* removing an enabled group from the chain decrements the 
588      * enabled counter */
589     if (enabled) {
590       chain->num_enabled--;
591       if (chain->num_enabled == 0) {
592         GST_LOG ("disabling chain %p after removal of the only enabled group",
593             chain);
594         GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
595       }
596     } else {
597       if (chain->num_enabled == chain->num_groups) {
598         GST_LOG ("enabling chain %p after removal of the only disabled group",
599             chain);
600         GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
601       }
602     }
603   }
604
605   GST_OPT_SCHEDULER_CHAIN_SET_DIRTY (chain);
606
607   chain = unref_chain (chain);
608   return chain;
609 }
610
611 static GstOptSchedulerChain *
612 merge_chains (GstOptSchedulerChain * chain1, GstOptSchedulerChain * chain2)
613 {
614   GSList *walk;
615
616   g_assert (chain1 != NULL);
617
618   GST_LOG ("merging chain %p and %p", chain1, chain2);
619
620   /* FIXME: document how chain2 can be NULL */
621   if (chain1 == chain2 || chain2 == NULL)
622     return chain1;
623
624   /* switch if it's more efficient */
625   if (chain1->num_groups < chain2->num_groups) {
626     GstOptSchedulerChain *tmp = chain2;
627
628     chain2 = chain1;
629     chain1 = tmp;
630   }
631
632   walk = chain2->groups;
633   while (walk) {
634     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
635
636     walk = g_slist_next (walk);
637
638     GST_LOG ("reparenting group %p from chain %p to %p", group, chain2, chain1);
639
640     ref_group (group);
641
642     remove_from_chain (chain2, group);
643     add_to_chain (chain1, group);
644
645     unref_group (group);
646   }
647
648   /* chain2 is now freed, if nothing else was referencing it before */
649
650   return chain1;
651 }
652
653 /* sorts the group list so that terminal sinks come first -- prevents pileup of
654  * buffers in bufpens */
655 static void
656 sort_chain (GstOptSchedulerChain * chain)
657 {
658   GSList *original = chain->groups;
659   GSList *new = NULL;
660   GSList *walk, *links, *this;
661
662   /* if there's only one group, just return */
663   if (!original->next)
664     return;
665   /* otherwise, we know that all groups are somehow linked together */
666
667   GST_LOG ("sorting chain %p (%d groups)", chain, g_slist_length (original));
668
669   /* first find the terminal sinks */
670   for (walk = original; walk;) {
671     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
672
673     this = walk;
674     walk = walk->next;
675     if (group->group_links) {
676       gboolean is_sink = TRUE;
677
678       for (links = group->group_links; links; links = links->next)
679         if (((GstOptSchedulerGroupLink *) links->data)->src == group)
680           is_sink = FALSE;
681       if (is_sink) {
682         /* found one */
683         original = g_slist_remove_link (original, this);
684         new = g_slist_concat (new, this);
685       }
686     }
687   }
688   g_assert (new != NULL);
689
690   /* now look for the elements that are linked to the terminal sinks */
691   for (walk = new; walk; walk = walk->next) {
692     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
693
694     for (links = group->group_links; links; links = links->next) {
695       this =
696           g_slist_find (original,
697           ((GstOptSchedulerGroupLink *) links->data)->src);
698       if (this) {
699         original = g_slist_remove_link (original, this);
700         new = g_slist_concat (new, this);
701       }
702     }
703   }
704   g_assert (original == NULL);
705
706   chain->groups = new;
707 }
708
709 static void
710 chain_group_set_enabled (GstOptSchedulerChain * chain,
711     GstOptSchedulerGroup * group, gboolean enabled)
712 {
713   gboolean oldstate;
714
715   g_assert (group != NULL);
716   g_assert (chain != NULL);
717
718   GST_LOG
719       ("request to %d group %p in chain %p, have %d groups enabled out of %d",
720       enabled, group, chain, chain->num_enabled, chain->num_groups);
721
722   oldstate = (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group) ? TRUE : FALSE);
723   if (oldstate == enabled) {
724     GST_LOG ("group %p in chain %p was in correct state", group, chain);
725     return;
726   }
727
728   if (enabled)
729     GST_OPT_SCHEDULER_GROUP_ENABLE (group);
730   else
731     GST_OPT_SCHEDULER_GROUP_DISABLE (group);
732
733   if (enabled) {
734     g_assert (chain->num_enabled < chain->num_groups);
735
736     chain->num_enabled++;
737
738     GST_DEBUG ("enable group %p in chain %p, now %d groups enabled out of %d",
739         group, chain, chain->num_enabled, chain->num_groups);
740
741     /* OK to call even if the scheduler (cothread context / schedulerfunc) was
742        setup already -- will get destroyed when the group is destroyed */
743     setup_group_scheduler (chain->sched, group);
744
745     if (chain->num_enabled == chain->num_groups) {
746       GST_DEBUG ("enable chain %p", chain);
747       GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
748     }
749   } else {
750     g_assert (chain->num_enabled > 0);
751
752     chain->num_enabled--;
753     GST_DEBUG ("disable group %p in chain %p, now %d groups enabled out of %d",
754         group, chain, chain->num_enabled, chain->num_groups);
755
756     if (chain->num_enabled == 0) {
757       GST_DEBUG ("disable chain %p", chain);
758       GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
759     }
760   }
761 }
762
763 /* recursively migrate the group and all connected groups into the new chain */
764 static void
765 chain_recursively_migrate_group (GstOptSchedulerChain * chain,
766     GstOptSchedulerGroup * group)
767 {
768   GSList *links;
769
770   /* group already in chain */
771   if (group->chain == chain)
772     return;
773
774   /* first remove the group from its old chain */
775   remove_from_chain (group->chain, group);
776   /* add to new chain */
777   add_to_chain (chain, group);
778
779   /* then follow all links */
780   links = group->group_links;
781   while (links) {
782     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
783
784     links = g_slist_next (links);
785
786     chain_recursively_migrate_group (chain, OTHER_GROUP_LINK (link, group));
787   }
788 }
789
790 static GstOptSchedulerGroup *
791 ref_group (GstOptSchedulerGroup * group)
792 {
793   GST_LOG ("ref group %p %d->%d", group, group->refcount, group->refcount + 1);
794
795   group->refcount++;
796
797   return group;
798 }
799
800 static GstOptSchedulerGroup *
801 unref_group (GstOptSchedulerGroup * group)
802 {
803   GST_LOG ("unref group %p %d->%d", group,
804       group->refcount, group->refcount - 1);
805
806   if (--group->refcount == 0) {
807     destroy_group (group);
808     group = NULL;
809   }
810
811   return group;
812 }
813
814 static GstOptSchedulerGroup *
815 create_group (GstOptSchedulerChain * chain, GstElement * element,
816     GstOptSchedulerGroupType type)
817 {
818   GstOptSchedulerGroup *group;
819
820   group = g_new0 (GstOptSchedulerGroup, 1);
821   GST_LOG ("new group %p, type %d", group, type);
822   group->refcount = 1;          /* float... */
823   group->flags = GST_OPT_SCHEDULER_GROUP_DISABLED;
824   group->type = type;
825
826   add_to_group (group, element, FALSE);
827   add_to_chain (chain, group);
828   group = unref_group (group);  /* ...and sink. */
829
830   /* group's refcount is now 2 (one for the element, one for the chain) */
831
832   return group;
833 }
834
835 static void
836 destroy_group (GstOptSchedulerGroup * group)
837 {
838   GST_LOG ("destroy group %p", group);
839
840   g_assert (group != NULL);
841   g_assert (group->elements == NULL);
842   g_assert (group->chain == NULL);
843   g_assert (group->group_links == NULL);
844
845   if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
846     destroy_group_scheduler (group);
847
848   g_free (group);
849 }
850
851 static GstOptSchedulerGroup *
852 add_to_group (GstOptSchedulerGroup * group, GstElement * element,
853     gboolean with_links)
854 {
855   g_assert (group != NULL);
856   g_assert (element != NULL);
857
858   GST_DEBUG ("adding element \"%s\" to group %p", GST_ELEMENT_NAME (element),
859       group);
860
861   if (GST_ELEMENT_IS_DECOUPLED (element)) {
862     GST_DEBUG ("element \"%s\" is decoupled, not adding to group %p",
863         GST_ELEMENT_NAME (element), group);
864     return group;
865   }
866
867   g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);
868
869   /* first increment the links that this group has with other groups through
870    * this element */
871   if (with_links)
872     group_inc_links_for_element (group, element);
873
874   /* Ref the group... */
875   GST_ELEMENT_SCHED_GROUP (element) = ref_group (group);
876
877   gst_object_ref (GST_OBJECT (element));
878   group->elements = g_slist_prepend (group->elements, element);
879   group->num_elements++;
880
881   if (gst_element_get_state (element) == GST_STATE_PLAYING) {
882     group_element_set_enabled (group, element, TRUE);
883   }
884
885   return group;
886 }
887
888 static GstOptSchedulerGroup *
889 remove_from_group (GstOptSchedulerGroup * group, GstElement * element)
890 {
891   GST_DEBUG ("removing element \"%s\" from group %p",
892       GST_ELEMENT_NAME (element), group);
893
894   g_assert (group != NULL);
895   g_assert (element != NULL);
896   g_assert (GST_ELEMENT_SCHED_GROUP (element) == group);
897
898   /* first decrement the links that this group has with other groups through
899    * this element */
900   group_dec_links_for_element (group, element);
901
902   group->elements = g_slist_remove (group->elements, element);
903   group->num_elements--;
904
905   /* if the element was an entry point in the group, clear the group's
906    * entry point, and mark it as unknown */
907   if (group->entry == element) {
908     group->entry = NULL;
909     group->type = GST_OPT_SCHEDULER_GROUP_UNKNOWN;
910   }
911
912   GST_ELEMENT_SCHED_GROUP (element) = NULL;
913   gst_object_unref (GST_OBJECT (element));
914
915   if (group->num_elements == 0) {
916     GST_LOG ("group %p is now empty", group);
917     /* don't know in what case group->chain would be NULL, but putting this here
918        in deference to 0.8 -- remove me in 0.9 */
919     if (group->chain) {
920       GST_LOG ("removing group %p from its chain", group);
921       chain_group_set_enabled (group->chain, group, FALSE);
922       remove_from_chain (group->chain, group);
923     }
924   }
925   group = unref_group (group);
926
927   return group;
928 }
929
930 /* FIXME need to check if the groups are of the same type -- otherwise need to
931    setup the scheduler again, if it is setup */
932 static GstOptSchedulerGroup *
933 merge_groups (GstOptSchedulerGroup * group1, GstOptSchedulerGroup * group2)
934 {
935   g_assert (group1 != NULL);
936
937   GST_DEBUG ("merging groups %p and %p", group1, group2);
938
939   if (group1 == group2 || group2 == NULL)
940     return group1;
941
942   /* make sure they end up in the same chain */
943   merge_chains (group1->chain, group2->chain);
944
945   while (group2 && group2->elements) {
946     GstElement *element = (GstElement *) group2->elements->data;
947
948     group2 = remove_from_group (group2, element);
949     add_to_group (group1, element, TRUE);
950   }
951
952   return group1;
953 }
954
955 /* setup the scheduler context for a group. The right schedule function
956  * is selected based on the group type and cothreads are created if 
957  * needed */
958 static void
959 setup_group_scheduler (GstOptScheduler * osched, GstOptSchedulerGroup * group)
960 {
961   GroupScheduleFunction wrapper;
962
963   GST_DEBUG ("setup group %p scheduler, type %d", group, group->type);
964
965   wrapper = unknown_group_schedule_function;
966
967   /* figure out the wrapper function for this group */
968   if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
969     wrapper = get_group_schedule_function;
970   else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
971     wrapper = loop_group_schedule_function;
972
973 #ifdef USE_COTHREADS
974   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
975     do_cothread_create (group->cothread, osched->context,
976         (cothread_func) wrapper, 0, (char **) group);
977   } else {
978     do_cothread_setfunc (group->cothread, osched->context,
979         (cothread_func) wrapper, 0, (char **) group);
980   }
981 #else
982   group->schedulefunc = wrapper;
983   group->argc = 0;
984   group->argv = (char **) group;
985 #endif
986   group->flags |= GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
987 }
988
989 static void
990 destroy_group_scheduler (GstOptSchedulerGroup * group)
991 {
992   g_assert (group);
993
994   if (group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)
995     g_warning ("destroying running group scheduler");
996
997 #ifdef USE_COTHREADS
998   if (group->cothread) {
999     do_cothread_destroy (group->cothread);
1000     group->cothread = NULL;
1001   }
1002 #else
1003   group->schedulefunc = NULL;
1004   group->argc = 0;
1005   group->argv = NULL;
1006 #endif
1007
1008   group->flags &= ~GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
1009 }
1010
1011 static void
1012 group_error_handler (GstOptSchedulerGroup * group)
1013 {
1014   GST_DEBUG ("group %p has errored", group);
1015
1016   chain_group_set_enabled (group->chain, group, FALSE);
1017   group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1018 }
1019
1020 /* this function enables/disables an element, it will set/clear a flag on the element 
1021  * and tells the chain that the group is enabled if all elements inside the group are
1022  * enabled */
1023 static void
1024 group_element_set_enabled (GstOptSchedulerGroup * group, GstElement * element,
1025     gboolean enabled)
1026 {
1027   g_assert (group != NULL);
1028   g_assert (element != NULL);
1029
1030   GST_LOG
1031       ("request to %d element %s in group %p, have %d elements enabled out of %d",
1032       enabled, GST_ELEMENT_NAME (element), group, group->num_enabled,
1033       group->num_elements);
1034
1035   /* Note that if an unlinked PLAYING element is added to a bin, we have to
1036      create a new group to hold the element, and this function will be called
1037      before the group is added to the chain. Thus we have a valid case for
1038      group->chain==NULL. */
1039
1040   if (enabled) {
1041     g_assert (group->num_enabled < group->num_elements);
1042
1043     group->num_enabled++;
1044
1045     GST_DEBUG
1046         ("enable element %s in group %p, now %d elements enabled out of %d",
1047         GST_ELEMENT_NAME (element), group, group->num_enabled,
1048         group->num_elements);
1049
1050     if (group->num_enabled == group->num_elements) {
1051       if (!group->chain) {
1052         GST_DEBUG ("enable chainless group %p", group);
1053         GST_OPT_SCHEDULER_GROUP_ENABLE (group);
1054       } else {
1055         GST_LOG ("enable group %p", group);
1056         chain_group_set_enabled (group->chain, group, TRUE);
1057       }
1058     }
1059   } else {
1060     g_assert (group->num_enabled > 0);
1061
1062     group->num_enabled--;
1063
1064     GST_DEBUG
1065         ("disable element %s in group %p, now %d elements enabled out of %d",
1066         GST_ELEMENT_NAME (element), group, group->num_enabled,
1067         group->num_elements);
1068
1069     if (group->num_enabled == 0) {
1070       if (!group->chain) {
1071         GST_DEBUG ("disable chainless group %p", group);
1072         GST_OPT_SCHEDULER_GROUP_DISABLE (group);
1073       } else {
1074         GST_LOG ("disable group %p", group);
1075         chain_group_set_enabled (group->chain, group, FALSE);
1076       }
1077     }
1078   }
1079 }
1080
1081 /* a group is scheduled by doing a cothread switch to it or
1082  * by calling the schedule function. In the non-cothread case
1083  * we cannot run already running groups so we return FALSE here
1084  * to indicate this to the caller */
1085 static gboolean
1086 schedule_group (GstOptSchedulerGroup * group)
1087 {
1088   if (!group->entry) {
1089     GST_INFO ("not scheduling group %p without entry", group);
1090     return FALSE;
1091   }
1092 #ifdef USE_COTHREADS
1093   if (group->cothread)
1094     do_cothread_switch (group->cothread);
1095   else
1096     g_warning ("(internal error): trying to schedule group without cothread");
1097   return TRUE;
1098 #else
1099   /* cothreads automatically call the pre- and post-run functions for us;
1100    * without cothreads we need to call them manually */
1101   if (group->schedulefunc == NULL) {
1102     GST_INFO ("not scheduling group %p without schedulefunc", group);
1103     return FALSE;
1104   } else {
1105     GSList *l;
1106
1107     for (l = group->elements; l; l = l->next) {
1108       GstElement *e = (GstElement *) l->data;
1109
1110       if (e->pre_run_func)
1111         e->pre_run_func (e);
1112     }
1113
1114     group->schedulefunc (group->argc, group->argv);
1115
1116     for (l = group->elements; l; l = l->next) {
1117       GstElement *e = (GstElement *) l->data;
1118
1119       if (e->post_run_func)
1120         e->post_run_func (e);
1121     }
1122
1123   }
1124   return TRUE;
1125 #endif
1126 }
1127
1128 #ifndef USE_COTHREADS
1129 static void
1130 gst_opt_scheduler_schedule_run_queue (GstOptScheduler * osched)
1131 {
1132   GST_LOG_OBJECT (osched, "running queue: %d groups, recursed %d times",
1133       g_list_length (osched->runqueue),
1134       osched->recursion, g_list_length (osched->runqueue));
1135
1136   /* note that we have a ref on each group on the queue (unref after running) */
1137
1138   /* make sure we don't exceed max_recursion */
1139   if (osched->recursion > osched->max_recursion) {
1140     osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1141     return;
1142   }
1143
1144   osched->recursion++;
1145
1146   while (osched->runqueue) {
1147     GstOptSchedulerGroup *group;
1148     gboolean res;
1149
1150     group = (GstOptSchedulerGroup *) osched->runqueue->data;
1151
1152     /* runqueue holds refcount to group */
1153     osched->runqueue = g_list_remove (osched->runqueue, group);
1154
1155     GST_LOG_OBJECT (osched, "scheduling group %p", group);
1156
1157     res = schedule_group (group);
1158     if (!res) {
1159       g_warning ("error scheduling group %p", group);
1160       group_error_handler (group);
1161     } else {
1162       GST_LOG_OBJECT (osched, "done scheduling group %p", group);
1163     }
1164     unref_group (group);
1165   }
1166
1167   GST_LOG_OBJECT (osched, "run queue length after scheduling %d",
1168       g_list_length (osched->runqueue));
1169
1170   osched->recursion--;
1171 }
1172 #endif
1173
1174 /* a chain is scheduled by picking the first active group and scheduling it */
1175 static void
1176 schedule_chain (GstOptSchedulerChain * chain)
1177 {
1178   GSList *groups;
1179   GstOptScheduler *osched;
1180
1181   osched = chain->sched;
1182
1183   /* if the chain has changed, we need to resort the groups so we enter in the
1184      proper place */
1185   if (GST_OPT_SCHEDULER_CHAIN_IS_DIRTY (chain))
1186     sort_chain (chain);
1187   GST_OPT_SCHEDULER_CHAIN_SET_CLEAN (chain);
1188
1189   groups = chain->groups;
1190   while (groups) {
1191     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1192
1193     if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
1194       ref_group (group);
1195       GST_LOG ("scheduling group %p in chain %p", group, chain);
1196
1197 #ifdef USE_COTHREADS
1198       schedule_group (group);
1199 #else
1200       osched->recursion = 0;
1201       if (!g_list_find (osched->runqueue, group)) {
1202         ref_group (group);
1203         osched->runqueue = g_list_append (osched->runqueue, group);
1204       }
1205       gst_opt_scheduler_schedule_run_queue (osched);
1206 #endif
1207
1208       GST_LOG ("done scheduling group %p in chain %p", group, chain);
1209       unref_group (group);
1210       break;
1211     }
1212
1213     groups = g_slist_next (groups);
1214   }
1215 }
1216
1217 /* a get-based group is scheduled by getting a buffer from the get based
1218  * entry point and by pushing the buffer to the peer.
1219  * We also set the running flag on this group for as long as this
1220  * function is running. */
1221 static int
1222 get_group_schedule_function (int argc, char *argv[])
1223 {
1224   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1225   GstElement *entry = group->entry;
1226   const GList *pads = gst_element_get_pad_list (entry);
1227
1228   GST_LOG ("executing get-based group %p", group);
1229
1230   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
1231
1232   while (pads) {
1233     GstData *data;
1234     GstPad *pad = GST_PAD (pads->data);
1235
1236     pads = g_list_next (pads);
1237
1238     /* skip sinks and ghostpads */
1239     if (!GST_PAD_IS_SRC (pad) || !GST_IS_REAL_PAD (pad))
1240       continue;
1241
1242     GST_DEBUG ("doing get and push on pad \"%s:%s\" in group %p",
1243         GST_DEBUG_PAD_NAME (pad), group);
1244
1245     data = gst_pad_call_get_function (pad);
1246     if (data) {
1247       if (GST_EVENT_IS_INTERRUPT (data)) {
1248         gst_event_unref (GST_EVENT (data));
1249         break;
1250       }
1251       gst_pad_push (pad, data);
1252     }
1253   }
1254
1255   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
1256
1257   return 0;
1258 }
1259
1260 /* a loop-based group is scheduled by calling the loop function
1261  * on the entry point. 
1262  * We also set the running flag on this group for as long as this
1263  * function is running. */
1264 static int
1265 loop_group_schedule_function (int argc, char *argv[])
1266 {
1267   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1268   GstElement *entry = group->entry;
1269
1270   GST_LOG ("executing loop-based group %p", group);
1271
1272   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
1273
1274   GST_DEBUG ("calling loopfunc of element %s in group %p",
1275       GST_ELEMENT_NAME (entry), group);
1276
1277   if (entry->loopfunc)
1278     entry->loopfunc (entry);
1279   else
1280     group_error_handler (group);
1281
1282   GST_LOG ("loopfunc ended of element %s in group %p",
1283       GST_ELEMENT_NAME (entry), group);
1284
1285   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
1286
1287   return 0;
1288
1289 }
1290
1291 /* the function to schedule an unknown group, which just gives an error */
1292 static int
1293 unknown_group_schedule_function (int argc, char *argv[])
1294 {
1295   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
1296
1297   g_warning ("(internal error) unknown group type %d, disabling\n",
1298       group->type);
1299   group_error_handler (group);
1300
1301   return 0;
1302 }
1303
1304 /* this function is called when the first element of a chain-loop or a loop-loop
1305  * link performs a push to the loop element. We then schedule the
1306  * group with the loop-based element until the bufpen is empty */
1307 static void
1308 gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data)
1309 {
1310   GstOptSchedulerGroup *group;
1311   GstOptScheduler *osched;
1312   GstRealPad *peer;
1313
1314   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
1315   osched = group->chain->sched;
1316   peer = GST_RPAD_PEER (sinkpad);
1317
1318   GST_LOG ("chain handler for loop-based pad %" GST_PTR_FORMAT, sinkpad);
1319
1320 #ifdef USE_COTHREADS
1321   if (GST_PAD_BUFLIST (peer)) {
1322     g_warning ("deadlock detected, disabling group %p", group);
1323     group_error_handler (group);
1324   } else {
1325     GST_LOG ("queueing data %p on %s:%s's bufpen", data,
1326         GST_DEBUG_PAD_NAME (peer));
1327     GST_PAD_BUFPEN (peer) = g_list_append (GST_PAD_BUFLIST (peer), data);
1328     schedule_group (group);
1329   }
1330 #else
1331   GST_LOG ("queueing data %p on %s:%s's bufpen", data,
1332       GST_DEBUG_PAD_NAME (peer));
1333   GST_PAD_BUFPEN (peer) = g_list_append (GST_PAD_BUFLIST (peer), data);
1334   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1335     GST_LOG ("adding group %p to runqueue", group);
1336     if (!g_list_find (osched->runqueue, group)) {
1337       ref_group (group);
1338       osched->runqueue = g_list_append (osched->runqueue, group);
1339     }
1340   }
1341 #endif
1342
1343   GST_LOG ("%d buffers left on %s:%s's bufpen after chain handler",
1344       g_list_length (GST_PAD_BUFLIST (peer)));
1345 }
1346
1347 /* this function is called by a loop based element that performs a
1348  * pull on a sinkpad. We schedule the peer group until the bufpen
1349  * is filled with the buffer so that this function  can return */
1350 static GstData *
1351 gst_opt_scheduler_get_wrapper (GstPad * srcpad)
1352 {
1353   GstData *data;
1354   GstOptSchedulerGroup *group;
1355   GstOptScheduler *osched;
1356   gboolean disabled;
1357
1358   GST_LOG ("get handler for %" GST_PTR_FORMAT, srcpad);
1359
1360   /* first try to grab a queued buffer */
1361   if (GST_PAD_BUFLIST (srcpad)) {
1362     data = GST_PAD_BUFLIST (srcpad)->data;
1363     GST_PAD_BUFPEN (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), data);
1364
1365     GST_LOG ("returning popped queued data %p", data);
1366
1367     return data;
1368   }
1369
1370   /* else we need to schedule the peer element */
1371   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad));
1372   osched = group->chain->sched;
1373   data = NULL;
1374   disabled = FALSE;
1375
1376   do {
1377     GST_LOG ("scheduling upstream group %p to fill bufpen", group);
1378 #ifdef USE_COTHREADS
1379     schedule_group (group);
1380 #else
1381     if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
1382       ref_group (group);
1383
1384       if (!g_list_find (osched->runqueue, group)) {
1385         ref_group (group);
1386         osched->runqueue = g_list_append (osched->runqueue, group);
1387       }
1388
1389       GST_LOG ("recursing into scheduler group %p", group);
1390       gst_opt_scheduler_schedule_run_queue (osched);
1391       GST_LOG ("return from recurse group %p", group);
1392
1393       /* if the other group was disabled we might have to break out of the loop */
1394       disabled = GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group);
1395       group = unref_group (group);
1396       /* group is gone */
1397       if (group == NULL) {
1398         /* if the group was gone we also might have to break out of the loop */
1399         disabled = TRUE;
1400       }
1401     } else {
1402       /* in this case, the group was running and we wanted to swtich to it,
1403        * this is not allowed in the optimal scheduler (yet) */
1404       g_warning ("deadlock detected, disabling group %p", group);
1405       group_error_handler (group);
1406       return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1407     }
1408 #endif
1409     /* if the scheduler interrupted, make sure we send an INTERRUPTED event to the
1410      * loop based element */
1411     if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
1412       GST_INFO ("scheduler interrupted, return interrupt event");
1413       data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1414     } else {
1415       if (GST_PAD_BUFLIST (srcpad)) {
1416         data = GST_PAD_BUFLIST (srcpad)->data;
1417         GST_PAD_BUFPEN (srcpad) =
1418             g_list_remove (GST_PAD_BUFLIST (srcpad), data);
1419       } else if (disabled) {
1420         /* no buffer in queue and peer group was disabled */
1421         data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
1422       }
1423     }
1424   }
1425   while (data == NULL);
1426
1427   GST_LOG ("get handler, returning data %p, queue length %d",
1428       data, g_list_length (GST_PAD_BUFLIST (srcpad)));
1429
1430   return data;
1431 }
1432
1433 static void
1434 pad_clear_queued (GstPad * srcpad, gpointer user_data)
1435 {
1436   GList *buflist = GST_PAD_BUFLIST (srcpad);
1437
1438   if (buflist) {
1439     GST_LOG ("need to clear some buffers");
1440     g_list_foreach (buflist, (GFunc) gst_data_unref, NULL);
1441     g_list_free (buflist);
1442     GST_PAD_BUFPEN (srcpad) = NULL;
1443   }
1444 }
1445
1446 static gboolean
1447 gst_opt_scheduler_event_wrapper (GstPad * srcpad, GstEvent * event)
1448 {
1449   gboolean flush;
1450
1451   GST_DEBUG ("intercepting event %d on pad %s:%s",
1452       GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (srcpad));
1453
1454   /* figure out if this is a flush event */
1455   switch (GST_EVENT_TYPE (event)) {
1456     case GST_EVENT_FLUSH:
1457       flush = TRUE;
1458       break;
1459     case GST_EVENT_SEEK:
1460     case GST_EVENT_SEEK_SEGMENT:
1461       flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
1462       break;
1463     default:
1464       flush = FALSE;
1465       break;
1466   }
1467
1468   if (flush) {
1469     GST_LOG ("event is flush");
1470
1471     pad_clear_queued (srcpad, NULL);
1472   }
1473   return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
1474 }
1475
1476 static GstElementStateReturn
1477 gst_opt_scheduler_state_transition (GstScheduler * sched, GstElement * element,
1478     gint transition)
1479 {
1480   GstOptSchedulerGroup *group;
1481   GstElementStateReturn res = GST_STATE_SUCCESS;
1482
1483   GST_DEBUG ("element \"%s\" state change %d", GST_ELEMENT_NAME (element),
1484       transition);
1485
1486   /* we check the state of the managing pipeline here */
1487   if (GST_IS_BIN (element)) {
1488     if (GST_SCHEDULER_PARENT (sched) == element) {
1489       GST_LOG ("parent \"%s\" changed state", GST_ELEMENT_NAME (element));
1490
1491       switch (transition) {
1492         case GST_STATE_PLAYING_TO_PAUSED:
1493           GST_INFO ("setting scheduler state to stopped");
1494           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED;
1495           break;
1496         case GST_STATE_PAUSED_TO_PLAYING:
1497           GST_INFO ("setting scheduler state to running");
1498           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING;
1499           break;
1500         default:
1501           GST_LOG ("no interesting state change, doing nothing");
1502       }
1503     }
1504     return res;
1505   }
1506
1507   /* we don't care about decoupled elements after this */
1508   if (GST_ELEMENT_IS_DECOUPLED (element))
1509     return GST_STATE_SUCCESS;
1510
1511   /* get the group of the element */
1512   group = GST_ELEMENT_SCHED_GROUP (element);
1513
1514   switch (transition) {
1515     case GST_STATE_PAUSED_TO_PLAYING:
1516       /* an element without a group has to be an unlinked src, sink
1517        * filter element */
1518       if (!group) {
1519         GST_INFO ("element \"%s\" has no group", GST_ELEMENT_NAME (element));
1520       }
1521       /* else construct the scheduling context of this group and enable it */
1522       else {
1523         group_element_set_enabled (group, element, TRUE);
1524       }
1525       break;
1526     case GST_STATE_PLAYING_TO_PAUSED:
1527       /* if the element still has a group, we disable it */
1528       if (group)
1529         group_element_set_enabled (group, element, FALSE);
1530       break;
1531     case GST_STATE_PAUSED_TO_READY:
1532     {
1533       GList *pads = (GList *) gst_element_get_pad_list (element);
1534
1535       g_list_foreach (pads, (GFunc) pad_clear_queued, NULL);
1536       break;
1537     }
1538     default:
1539       break;
1540   }
1541
1542   return res;
1543 }
1544
1545 static void
1546 gst_opt_scheduler_scheduling_change (GstScheduler * sched, GstElement * element)
1547 {
1548   g_warning ("scheduling change, implement me");
1549 }
1550
1551 static void
1552 get_group (GstElement * element, GstOptSchedulerGroup ** group)
1553 {
1554   GstOptSchedulerCtx *ctx;
1555
1556   /*GList *pads; */
1557
1558   ctx = GST_ELEMENT_SCHED_CONTEXT (element);
1559   if (ctx)
1560     *group = ctx->group;
1561   else
1562     *group = NULL;
1563 }
1564
1565 /*
1566  * the idea is to put the two elements into the same group. 
1567  * - When no element is inside a group, we create a new group and add 
1568  *   the elements to it. 
1569  * - When one of the elements has a group, add the other element to 
1570  *   that group
1571  * - if both of the elements have a group, we merge the groups, which
1572  *   will also merge the chains.
1573  * Group links must be managed by the caller.
1574  */
1575 static GstOptSchedulerGroup *
1576 group_elements (GstOptScheduler * osched, GstElement * element1,
1577     GstElement * element2, GstOptSchedulerGroupType type)
1578 {
1579   GstOptSchedulerGroup *group1, *group2, *group = NULL;
1580
1581   get_group (element1, &group1);
1582   get_group (element2, &group2);
1583
1584   /* none of the elements is added to a group, create a new group
1585    * and chain to add the elements to */
1586   if (!group1 && !group2) {
1587     GstOptSchedulerChain *chain;
1588
1589     GST_DEBUG ("creating new group to hold \"%s\" and \"%s\"",
1590         GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1591
1592     chain = create_chain (osched);
1593     group = create_group (chain, element1, type);
1594     add_to_group (group, element2, TRUE);
1595   }
1596   /* the first element has a group */
1597   else if (group1) {
1598     GST_DEBUG ("adding \"%s\" to \"%s\"'s group",
1599         GST_ELEMENT_NAME (element2), GST_ELEMENT_NAME (element1));
1600
1601     /* the second element also has a group, merge */
1602     if (group2)
1603       merge_groups (group1, group2);
1604     /* the second element has no group, add it to the group
1605      * of the first element */
1606     else
1607       add_to_group (group1, element2, TRUE);
1608
1609     group = group1;
1610   }
1611   /* element1 has no group, element2 does. Add element1 to the
1612    * group of element2 */
1613   else {
1614     GST_DEBUG ("adding \"%s\" to \"%s\"'s group",
1615         GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1616     add_to_group (group2, element1, TRUE);
1617     group = group2;
1618   }
1619   return group;
1620 }
1621
1622 /*
1623  * increment link counts between groups -- it's important that src is actually
1624  * the src group, so we can introspect the topology later
1625  */
1626 static void
1627 group_inc_link (GstOptSchedulerGroup * src, GstOptSchedulerGroup * sink)
1628 {
1629   GSList *links = src->group_links;
1630   gboolean done = FALSE;
1631   GstOptSchedulerGroupLink *link;
1632
1633   /* first try to find a previous link */
1634   while (links && !done) {
1635     link = (GstOptSchedulerGroupLink *) links->data;
1636     links = g_slist_next (links);
1637
1638     if (IS_GROUP_LINK (link, src, sink)) {
1639       /* we found a link to this group, increment the link count */
1640       link->count++;
1641       GST_LOG ("incremented group link count between %p and %p to %d",
1642           src, sink, link->count);
1643       done = TRUE;
1644     }
1645   }
1646   if (!done) {
1647     /* no link was found, create a new one */
1648     link = g_new0 (GstOptSchedulerGroupLink, 1);
1649
1650     link->src = src;
1651     link->sink = sink;
1652     link->count = 1;
1653
1654     src->group_links = g_slist_prepend (src->group_links, link);
1655     sink->group_links = g_slist_prepend (sink->group_links, link);
1656
1657     GST_DEBUG ("added group link between %p and %p", src, sink);
1658   }
1659 }
1660
1661 /*
1662  * decrement link counts between groups, returns TRUE if the link count reaches
1663  * 0 -- note that the groups are not necessarily ordered as (src, sink) like
1664  * inc_link requires
1665  */
1666 static gboolean
1667 group_dec_link (GstOptSchedulerGroup * group1, GstOptSchedulerGroup * group2)
1668 {
1669   GSList *links = group1->group_links;
1670   gboolean res = FALSE;
1671   GstOptSchedulerGroupLink *link;
1672
1673   while (links) {
1674     link = (GstOptSchedulerGroupLink *) links->data;
1675     links = g_slist_next (links);
1676
1677     if (IS_GROUP_LINK (link, group1, group2)) {
1678       g_assert (link->count > 0);
1679       link->count--;
1680       GST_LOG ("link count between %p and %p is now %d",
1681           group1, group2, link->count);
1682       if (link->count == 0) {
1683         group1->group_links = g_slist_remove (group1->group_links, link);
1684         group2->group_links = g_slist_remove (group2->group_links, link);
1685         g_free (link);
1686         GST_DEBUG ("removed group link between %p and %p", group1, group2);
1687         res = TRUE;
1688       }
1689       break;
1690     }
1691   }
1692   return res;
1693 }
1694
1695
1696 typedef enum
1697 {
1698   GST_OPT_INVALID,
1699   GST_OPT_GET_TO_CHAIN,
1700   GST_OPT_LOOP_TO_CHAIN,
1701   GST_OPT_GET_TO_LOOP,
1702   GST_OPT_CHAIN_TO_CHAIN,
1703   GST_OPT_CHAIN_TO_LOOP,
1704   GST_OPT_LOOP_TO_LOOP
1705 }
1706 LinkType;
1707
1708 /*
1709  * Entry points for this scheduler.
1710  */
1711 static void
1712 gst_opt_scheduler_setup (GstScheduler * sched)
1713 {
1714 #ifdef USE_COTHREADS
1715   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1716
1717   /* first create thread context */
1718   if (osched->context == NULL) {
1719     GST_DEBUG ("initializing cothread context");
1720     osched->context = do_cothread_context_init ();
1721   }
1722 #endif
1723 }
1724
1725 static void
1726 gst_opt_scheduler_reset (GstScheduler * sched)
1727 {
1728 #ifdef USE_COTHREADS
1729   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1730   GSList *chains = osched->chains;
1731
1732   while (chains) {
1733     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
1734     GSList *groups = chain->groups;
1735
1736     while (groups) {
1737       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1738
1739       destroy_group_scheduler (group);
1740       groups = groups->next;
1741     }
1742     chains = chains->next;
1743   }
1744
1745   if (osched->context) {
1746     do_cothread_context_destroy (osched->context);
1747     osched->context = NULL;
1748   }
1749 #endif
1750 }
1751
1752 static void
1753 gst_opt_scheduler_add_element (GstScheduler * sched, GstElement * element)
1754 {
1755   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1756   GstOptSchedulerCtx *ctx;
1757   const GList *pads;
1758
1759   GST_DEBUG_OBJECT (sched, "adding element \"%s\"", GST_OBJECT_NAME (element));
1760
1761   /* decoupled elements are not added to the scheduler lists */
1762   if (GST_ELEMENT_IS_DECOUPLED (element))
1763     return;
1764
1765   ctx = g_new0 (GstOptSchedulerCtx, 1);
1766   GST_ELEMENT (element)->sched_private = ctx;
1767   ctx->flags = GST_OPT_SCHEDULER_CTX_DISABLED;
1768
1769   /* set event handler on all pads here so events work unconnected too;
1770    * in _link, it can be overruled if need be */
1771   /* FIXME: we should also do this when new pads on the element are created;
1772      but there are no hooks, so we do it again in _link */
1773   pads = gst_element_get_pad_list (element);
1774   while (pads) {
1775     GstPad *pad = GST_PAD (pads->data);
1776
1777     pads = g_list_next (pads);
1778
1779     if (!GST_IS_REAL_PAD (pad))
1780       continue;
1781     GST_RPAD_EVENTHANDLER (pad) = GST_RPAD_EVENTFUNC (pad);
1782   }
1783
1784   /* loop based elements *always* end up in their own group. It can eventually
1785    * be merged with another group when a link is made */
1786   if (element->loopfunc) {
1787     GstOptSchedulerGroup *group;
1788     GstOptSchedulerChain *chain;
1789
1790     chain = create_chain (osched);
1791
1792     group = create_group (chain, element, GST_OPT_SCHEDULER_GROUP_LOOP);
1793     group->entry = element;
1794
1795     GST_LOG ("added element \"%s\" as loop based entry",
1796         GST_ELEMENT_NAME (element));
1797   }
1798 }
1799
1800 static void
1801 gst_opt_scheduler_remove_element (GstScheduler * sched, GstElement * element)
1802 {
1803   GstOptSchedulerGroup *group;
1804
1805   GST_DEBUG_OBJECT (sched, "removing element \"%s\"",
1806       GST_OBJECT_NAME (element));
1807
1808   /* decoupled elements are not added to the scheduler lists and should therefore
1809    * not be removed */
1810   if (GST_ELEMENT_IS_DECOUPLED (element))
1811     return;
1812
1813   /* the element is guaranteed to live in it's own group/chain now */
1814   get_group (element, &group);
1815   if (group) {
1816     remove_from_group (group, element);
1817   }
1818
1819   g_free (GST_ELEMENT (element)->sched_private);
1820   GST_ELEMENT (element)->sched_private = NULL;
1821 }
1822
1823 static gboolean
1824 gst_opt_scheduler_yield (GstScheduler * sched, GstElement * element)
1825 {
1826 #ifdef USE_COTHREADS
1827   /* yield hands control to the main cothread context if the requesting 
1828    * element is the entry point of the group */
1829   GstOptSchedulerGroup *group;
1830
1831   get_group (element, &group);
1832   if (group && group->entry == element)
1833     do_cothread_switch (do_cothread_get_main (((GstOptScheduler *) sched)->
1834             context));
1835
1836   return FALSE;
1837 #else
1838   g_warning ("element %s performs a yield, please fix the element",
1839       GST_ELEMENT_NAME (element));
1840   return TRUE;
1841 #endif
1842 }
1843
1844 static gboolean
1845 gst_opt_scheduler_interrupt (GstScheduler * sched, GstElement * element)
1846 {
1847   GST_INFO ("interrupt from \"%s\"", GST_OBJECT_NAME (element));
1848
1849 #ifdef USE_COTHREADS
1850   do_cothread_switch (do_cothread_get_main (((GstOptScheduler *) sched)->
1851           context));
1852   return FALSE;
1853 #else
1854   {
1855     GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1856
1857     GST_INFO ("scheduler set interrupted state");
1858     osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
1859   }
1860   return TRUE;
1861 #endif
1862 }
1863
1864 static void
1865 gst_opt_scheduler_error (GstScheduler * sched, GstElement * element)
1866 {
1867   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1868   GstOptSchedulerGroup *group;
1869
1870   get_group (element, &group);
1871   if (group)
1872     group_error_handler (group);
1873
1874   osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1875 }
1876
1877 /* link pads, merge groups and chains */
1878 static void
1879 gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
1880     GstPad * sinkpad)
1881 {
1882   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
1883   LinkType type = GST_OPT_INVALID;
1884   GstElement *src_element, *sink_element;
1885
1886   GST_INFO ("scheduling link between %s:%s and %s:%s",
1887       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1888
1889   src_element = GST_PAD_PARENT (srcpad);
1890   sink_element = GST_PAD_PARENT (sinkpad);
1891
1892   /* first we need to figure out what type of link we're dealing
1893    * with */
1894   if (src_element->loopfunc && sink_element->loopfunc)
1895     type = GST_OPT_LOOP_TO_LOOP;
1896   else {
1897     if (src_element->loopfunc) {
1898       if (GST_RPAD_CHAINFUNC (sinkpad))
1899         type = GST_OPT_LOOP_TO_CHAIN;
1900     } else if (sink_element->loopfunc) {
1901       if (GST_RPAD_GETFUNC (srcpad)) {
1902         type = GST_OPT_GET_TO_LOOP;
1903         /* this could be tricky, the get based source could 
1904          * already be part of a loop based group in another pad,
1905          * we assert on that for now */
1906         if (GST_ELEMENT_SCHED_CONTEXT (src_element) != NULL &&
1907             GST_ELEMENT_SCHED_GROUP (src_element) != NULL) {
1908           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (src_element);
1909
1910           /* if the loop based element is the entry point we're ok, if it
1911            * isn't then we have multiple loop based elements in this group */
1912           if (group->entry != sink_element) {
1913             g_error
1914                 ("internal error: cannot schedule get to loop in multi-loop based group");
1915             return;
1916           }
1917         }
1918       } else
1919         type = GST_OPT_CHAIN_TO_LOOP;
1920     } else {
1921       if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad)) {
1922         type = GST_OPT_GET_TO_CHAIN;
1923         /* the get based source could already be part of a loop 
1924          * based group in another pad, we assert on that for now */
1925         if (GST_ELEMENT_SCHED_CONTEXT (src_element) != NULL &&
1926             GST_ELEMENT_SCHED_GROUP (src_element) != NULL) {
1927           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (src_element);
1928
1929           /* if the get based element is the entry point we're ok, if it
1930            * isn't then we have a mixed loop/chain based group */
1931           if (group->entry != src_element) {
1932             g_error ("internal error: cannot schedule get to chain "
1933                 "with mixed loop/chain based group");
1934             return;
1935           }
1936         }
1937       } else
1938         type = GST_OPT_CHAIN_TO_CHAIN;
1939     }
1940   }
1941
1942   /* since we can't set event handlers on pad creation after addition, it is
1943    * best we set all of them again to the default before linking */
1944   GST_RPAD_EVENTHANDLER (srcpad) = GST_RPAD_EVENTFUNC (srcpad);
1945   GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad);
1946
1947   /* for each link type, perform specific actions */
1948   switch (type) {
1949     case GST_OPT_GET_TO_CHAIN:
1950     {
1951       GstOptSchedulerGroup *group = NULL;
1952
1953       GST_LOG ("get to chain based link");
1954
1955       /* setup get/chain handlers */
1956       GST_RPAD_GETHANDLER (srcpad) = gst_pad_call_get_function;
1957       GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function;
1958
1959       /* the two elements should be put into the same group, 
1960        * this also means that they are in the same chain automatically */
1961       group = group_elements (osched, src_element, sink_element,
1962           GST_OPT_SCHEDULER_GROUP_GET);
1963
1964       /* if there is not yet an entry in the group, select the source
1965        * element as the entry point and mark the group as a get based
1966        * group */
1967       if (!group->entry) {
1968         group->entry = src_element;
1969         group->type = GST_OPT_SCHEDULER_GROUP_GET;
1970
1971         GST_DEBUG ("setting \"%s\" as entry point of _get-based group %p",
1972             GST_ELEMENT_NAME (src_element), group);
1973       }
1974       break;
1975     }
1976     case GST_OPT_LOOP_TO_CHAIN:
1977     case GST_OPT_CHAIN_TO_CHAIN:
1978       GST_LOG ("loop/chain to chain based link");
1979
1980       GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function;
1981
1982       /* the two elements should be put into the same group, this also means
1983        * that they are in the same chain automatically, in case of a loop-based
1984        * src_element, there will be a group for src_element and sink_element
1985        * will be added to it. In the case a new group is created, we can't know
1986        * the type so we pass UNKNOWN as an arg */
1987       group_elements (osched, src_element, sink_element,
1988           GST_OPT_SCHEDULER_GROUP_UNKNOWN);
1989       break;
1990     case GST_OPT_GET_TO_LOOP:
1991       GST_LOG ("get to loop based link");
1992
1993       GST_RPAD_GETHANDLER (srcpad) = gst_pad_call_get_function;
1994
1995       /* the two elements should be put into the same group, this also means
1996        * that they are in the same chain automatically, sink_element is
1997        * loop-based so it already has a group where src_element will be added
1998        * to */
1999       group_elements (osched, src_element, sink_element,
2000           GST_OPT_SCHEDULER_GROUP_LOOP);
2001       break;
2002     case GST_OPT_CHAIN_TO_LOOP:
2003     case GST_OPT_LOOP_TO_LOOP:
2004     {
2005       GstOptSchedulerGroup *group1, *group2;
2006
2007       GST_LOG ("chain/loop to loop based link");
2008
2009       GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper;
2010       GST_RPAD_GETHANDLER (srcpad) = gst_opt_scheduler_get_wrapper;
2011       /* events on the srcpad have to be intercepted as we might need to
2012        * flush the buffer lists, so override the given eventfunc */
2013       GST_RPAD_EVENTHANDLER (srcpad) = gst_opt_scheduler_event_wrapper;
2014
2015       group1 = GST_ELEMENT_SCHED_GROUP (src_element);
2016       group2 = GST_ELEMENT_SCHED_GROUP (sink_element);
2017
2018       g_assert (group2 != NULL);
2019
2020       /* group2 is guaranteed to exist as it contains a loop-based element.
2021        * group1 only exists if src_element is linked to some other element */
2022       if (!group1) {
2023         /* create a new group for src_element as it cannot be merged into another group
2024          * here. we create the group in the same chain as the loop-based element. */
2025         GST_DEBUG ("creating new group for element %s",
2026             GST_ELEMENT_NAME (src_element));
2027         group1 =
2028             create_group (group2->chain, src_element,
2029             GST_OPT_SCHEDULER_GROUP_LOOP);
2030       } else {
2031         /* both elements are already in a group, make sure they are added to
2032          * the same chain */
2033         merge_chains (group1->chain, group2->chain);
2034       }
2035       group_inc_link (group1, group2);
2036       break;
2037     }
2038     case GST_OPT_INVALID:
2039       g_error ("(internal error) invalid element link, what are you doing?");
2040       break;
2041   }
2042 }
2043
2044 /* 
2045  * checks if an element is still linked to some other element in the group. 
2046  * no checking is done on the brokenpad arg 
2047  */
2048 static gboolean
2049 element_has_link_with_group (GstElement * element, GstOptSchedulerGroup * group,
2050     GstPad * brokenpad)
2051 {
2052   gboolean linked = FALSE;
2053   const GList *pads;
2054
2055   /* see if the element has no more links to the peer group */
2056   pads = gst_element_get_pad_list (element);
2057   while (pads && !linked) {
2058     GstPad *pad = GST_PAD (pads->data);
2059
2060     pads = g_list_next (pads);
2061
2062     /* we only operate on real pads and on the pad that is not broken */
2063     if (!GST_IS_REAL_PAD (pad) || pad == brokenpad)
2064       continue;
2065
2066     if (GST_PAD_PEER (pad)) {
2067       GstElement *parent;
2068       GstOptSchedulerGroup *parentgroup;
2069
2070       /* see in what group this element is */
2071       parent = GST_PAD_PARENT (GST_PAD_PEER (pad));
2072
2073       /* links with decoupled elements are valid */
2074       if (GST_ELEMENT_IS_DECOUPLED (parent)) {
2075         linked = TRUE;
2076       } else {
2077         /* for non-decoupled elements we need to check the group */
2078         get_group (parent, &parentgroup);
2079
2080         /* if it's in the same group, we're still linked */
2081         if (parentgroup == group)
2082           linked = TRUE;
2083       }
2084     }
2085   }
2086   return linked;
2087 }
2088
2089 /* 
2090  * checks if a target group is still reachable from the group without taking the broken
2091  * group link into account.
2092  */
2093 static gboolean
2094 group_can_reach_group (GstOptSchedulerGroup * group,
2095     GstOptSchedulerGroup * target)
2096 {
2097   gboolean reachable = FALSE;
2098   const GSList *links = group->group_links;
2099
2100   GST_LOG ("checking if group %p can reach %p", group, target);
2101
2102   /* seems like we found the target element */
2103   if (group == target) {
2104     GST_LOG ("found way to reach %p", target);
2105     return TRUE;
2106   }
2107
2108   /* if the group is marked as visited, we don't need to check here */
2109   if (GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET (group,
2110           GST_OPT_SCHEDULER_GROUP_VISITED)) {
2111     GST_LOG ("already visited %p", group);
2112     return FALSE;
2113   }
2114
2115   /* mark group as visited */
2116   GST_OPT_SCHEDULER_GROUP_SET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
2117
2118   while (links && !reachable) {
2119     GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
2120     GstOptSchedulerGroup *other;
2121
2122     links = g_slist_next (links);
2123
2124     /* find other group in this link */
2125     other = OTHER_GROUP_LINK (link, group);
2126
2127     GST_LOG ("found link from %p to %p, count %d", group, other, link->count);
2128
2129     /* check if we can reach the target recursiveley */
2130     reachable = group_can_reach_group (other, target);
2131   }
2132   /* unset the visited flag, note that this is not optimal as we might be checking
2133    * groups several times when they are reachable with a loop. An alternative would be
2134    * to not clear the group flag at this stage but clear all flags in the chain when
2135    * all groups are checked. */
2136   GST_OPT_SCHEDULER_GROUP_UNSET_FLAG (group, GST_OPT_SCHEDULER_GROUP_VISITED);
2137
2138   GST_LOG ("leaving group %p with %s", group, (reachable ? "TRUE" : "FALSE"));
2139
2140   return reachable;
2141 }
2142
2143 /*
2144  * Go through all the pads of the given element and decrement the links that
2145  * this group has with the group of the peer element.  This function is mainly used
2146  * to update the group connections before we remove the element from the group.
2147  */
2148 static void
2149 group_dec_links_for_element (GstOptSchedulerGroup * group, GstElement * element)
2150 {
2151   GList *l;
2152   GstPad *pad;
2153   GstOptSchedulerGroup *peer_group;
2154
2155   for (l = GST_ELEMENT_PADS (element); l; l = l->next) {
2156     pad = (GstPad *) l->data;
2157     if (GST_IS_REAL_PAD (pad) && GST_PAD_PEER (pad)) {
2158       get_group (GST_PAD_PARENT (GST_PAD_PEER (pad)), &peer_group);
2159       if (peer_group && peer_group != group)
2160         group_dec_link (group, peer_group);
2161     }
2162   }
2163 }
2164
2165 /*
2166  * Go through all the pads of the given element and increment the links that
2167  * this group has with the group of the peer element.  This function is mainly used
2168  * to update the group connections before we add the element to the group.
2169  */
2170 static void
2171 group_inc_links_for_element (GstOptSchedulerGroup * group, GstElement * element)
2172 {
2173   GList *l;
2174   GstPad *pad;
2175   GstOptSchedulerGroup *peer_group;
2176
2177   for (l = GST_ELEMENT_PADS (element); l; l = l->next) {
2178     pad = (GstPad *) l->data;
2179     if (GST_IS_REAL_PAD (pad) && GST_PAD_PEER (pad)) {
2180       get_group (GST_PAD_PARENT (GST_PAD_PEER (pad)), &peer_group);
2181       if (peer_group && peer_group != group)
2182         group_inc_link (group, peer_group);
2183     }
2184   }
2185 }
2186
2187 static void
2188 gst_opt_scheduler_pad_unlink (GstScheduler * sched,
2189     GstPad * srcpad, GstPad * sinkpad)
2190 {
2191   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2192   GstElement *src_element, *sink_element;
2193   GstOptSchedulerGroup *group1, *group2;
2194
2195   GST_INFO ("unscheduling link between %s:%s and %s:%s",
2196       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
2197
2198   src_element = GST_PAD_PARENT (srcpad);
2199   sink_element = GST_PAD_PARENT (sinkpad);
2200
2201   get_group (src_element, &group1);
2202   get_group (sink_element, &group2);
2203
2204   /* for decoupled elements (that are never put into a group) we use the
2205    * group of the peer element for the remainder of the algorithm */
2206   if (GST_ELEMENT_IS_DECOUPLED (src_element)) {
2207     group1 = group2;
2208   }
2209   if (GST_ELEMENT_IS_DECOUPLED (sink_element)) {
2210     group2 = group1;
2211   }
2212
2213   /* if one the elements has no group (anymore) we don't really care 
2214    * about the link */
2215   if (!group1 || !group2) {
2216     GST_LOG
2217         ("one (or both) of the elements is not in a group, not interesting");
2218     return;
2219   }
2220
2221   /* easy part, groups are different */
2222   if (group1 != group2) {
2223     gboolean zero;
2224
2225     GST_LOG ("elements are in different groups");
2226
2227     /* we can remove the links between the groups now */
2228     zero = group_dec_link (group1, group2);
2229
2230     /* if the groups are not directly connected anymore, we have to perform a
2231      * recursive check to see if they are really unlinked */
2232     if (zero) {
2233       gboolean still_link;
2234       GstOptSchedulerChain *chain;
2235
2236       /* see if group1 and group2 are still connected in any indirect way */
2237       still_link = group_can_reach_group (group1, group2);
2238
2239       GST_DEBUG ("group %p %s reach group %p", group1,
2240           (still_link ? "can" : "can't"), group2);
2241       if (!still_link) {
2242         /* groups are really disconnected, migrate one group to a new chain */
2243         chain = create_chain (osched);
2244         chain_recursively_migrate_group (chain, group1);
2245
2246         GST_DEBUG ("migrated group %p to new chain %p", group1, chain);
2247       }
2248     } else {
2249       GST_DEBUG ("group %p still has direct link with group %p", group1,
2250           group2);
2251     }
2252   }
2253   /* hard part, groups are equal */
2254   else {
2255     gboolean still_link1, still_link2;
2256     GstOptSchedulerGroup *group;
2257
2258     /* since group1 == group2, it doesn't matter which group we take */
2259     group = group1;
2260
2261     GST_LOG ("elements are in the same group %p", group);
2262
2263     /* check if the element is still linked to some other element in the group,
2264      * we pass the pad that is broken up as an arg because a link on that pad
2265      * is not valid anymore.
2266      * Note that this check is only to make sure that a single element can be removed 
2267      * completely from the group, we also have to check for migrating several 
2268      * elements to a new group. */
2269     still_link1 = element_has_link_with_group (src_element, group, srcpad);
2270     still_link2 = element_has_link_with_group (sink_element, group, sinkpad);
2271     /* if there is still a link, we don't need to break this group */
2272     if (still_link1 && still_link2) {
2273       GSList *l;
2274       GList *m;
2275       int linkcount;
2276
2277       GST_LOG ("elements still have links with other elements in the group");
2278
2279       while (group && group->elements)
2280         for (l = group->elements; l && l->data; l = l->next) {
2281           GstElement *element = (GstElement *) l->data;
2282
2283           if (!element || !GST_IS_ELEMENT (element) ||
2284               GST_ELEMENT_IS_DECOUPLED (element))
2285             continue;
2286
2287           linkcount = 0;
2288           GST_LOG ("Examining %s\n", GST_ELEMENT_NAME (element));
2289           for (m = GST_ELEMENT_PADS (element); m; m = m->next) {
2290             GstPad *peer, *pad;
2291             GstElement *parent;
2292             GstOptSchedulerGroup *peer_group;
2293
2294             pad = (GstPad *) m->data;
2295             if (!pad || !GST_IS_REAL_PAD (pad))
2296               continue;
2297
2298             peer = GST_PAD_PEER (pad);
2299             if (!peer || !GST_IS_REAL_PAD (peer))
2300               continue;
2301
2302             parent = GST_PAD_PARENT (GST_PAD_PEER (pad));
2303             get_group (parent, &peer_group);
2304             if (peer_group && peer_group != group) {
2305               GST_LOG ("pad %s is linked with %s\n",
2306                   GST_PAD_NAME (pad), GST_ELEMENT_NAME (parent));
2307               linkcount++;
2308             }
2309           }
2310
2311           if (linkcount < 2) {
2312             group = remove_from_group (group, element);
2313           }
2314           /* if linkcount == 2, it will be unlinked later on */
2315           else if (linkcount > 2) {
2316             g_warning
2317                 ("opt: Can't handle element %s with 3 or more links, aborting",
2318                 GST_ELEMENT_NAME (element));
2319             return;
2320           }
2321         }
2322       /* Peer element will be caught during next iteration */
2323       return;
2324     }
2325
2326     /* now check which one of the elements we can remove from the group */
2327     if (!still_link1) {
2328       /* we only remove elements that are not the entry point of a loop based
2329        * group and are not decoupled */
2330       if (!(group->entry == src_element &&
2331               group->type == GST_OPT_SCHEDULER_GROUP_LOOP) &&
2332           !GST_ELEMENT_IS_DECOUPLED (src_element)) {
2333         GST_LOG ("el ement1 is separated from the group");
2334
2335         remove_from_group (group, src_element);
2336       } else {
2337         GST_LOG ("src_element is decoupled or entry in loop based group");
2338       }
2339     }
2340
2341     if (!still_link2) {
2342       /* we only remove elements that are not the entry point of a loop based
2343        * group and are not decoupled */
2344       if (!(group->entry == sink_element &&
2345               group->type == GST_OPT_SCHEDULER_GROUP_LOOP) &&
2346           !GST_ELEMENT_IS_DECOUPLED (sink_element)) {
2347         GST_LOG ("sink_element is separated from the group");
2348
2349         remove_from_group (group, sink_element);
2350       } else {
2351         GST_LOG ("sink_element is decoupled or entry in loop based group");
2352       }
2353     }
2354   }
2355 }
2356
2357 /* a scheduler iteration is done by looping and scheduling the active chains */
2358 static GstSchedulerState
2359 gst_opt_scheduler_iterate (GstScheduler * sched)
2360 {
2361   GstSchedulerState state = GST_SCHEDULER_STATE_STOPPED;
2362   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2363   gint iterations = osched->iterations;
2364
2365   osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2366
2367   //gst_opt_scheduler_show (sched);
2368
2369   GST_DEBUG_OBJECT (sched, "iterating");
2370
2371   while (iterations) {
2372     gboolean scheduled = FALSE;
2373     GSList *chains;
2374
2375     /* we have to schedule each of the scheduler chains now */
2376     chains = osched->chains;
2377     while (chains) {
2378       GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2379
2380       ref_chain (chain);
2381       /* if the chain is not disabled, schedule it */
2382       if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
2383         GST_LOG ("scheduling chain %p", chain);
2384         schedule_chain (chain);
2385         scheduled = TRUE;
2386       } else {
2387         GST_LOG ("not scheduling disabled chain %p", chain);
2388       }
2389
2390       /* don't schedule any more chains when in error */
2391       if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2392         GST_ERROR_OBJECT (sched, "in error state");
2393         break;
2394       } else if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
2395         GST_DEBUG_OBJECT (osched, "got interrupted, continue with next chain");
2396         osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
2397       }
2398
2399       chains = g_slist_next (chains);
2400       unref_chain (chain);
2401     }
2402
2403     /* at this point it's possible that the scheduler state is
2404      * in error, we then return an error */
2405     if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
2406       state = GST_SCHEDULER_STATE_ERROR;
2407       break;
2408     } else {
2409       /* if chains were scheduled, return our current state */
2410       if (scheduled)
2411         state = GST_SCHEDULER_STATE (sched);
2412       /* if no chains were scheduled, we say we are stopped */
2413       else {
2414         state = GST_SCHEDULER_STATE_STOPPED;
2415         break;
2416       }
2417     }
2418     if (iterations > 0)
2419       iterations--;
2420   }
2421
2422   return state;
2423 }
2424
2425
2426 static void
2427 gst_opt_scheduler_show (GstScheduler * sched)
2428 {
2429   GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
2430   GSList *chains;
2431
2432   g_print ("iterations:    %d\n", osched->iterations);
2433   g_print ("max recursion: %d\n", osched->max_recursion);
2434
2435   chains = osched->chains;
2436   while (chains) {
2437     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
2438     GSList *groups = chain->groups;
2439
2440     chains = g_slist_next (chains);
2441
2442     g_print ("+- chain %p: refcount %d, %d groups, %d enabled, flags %d\n",
2443         chain, chain->refcount, chain->num_groups, chain->num_enabled,
2444         chain->flags);
2445
2446     while (groups) {
2447       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
2448       GSList *elements = group->elements;
2449       GSList *group_links = group->group_links;
2450
2451       groups = g_slist_next (groups);
2452
2453       g_print
2454           (" +- group %p: refcount %d, %d elements, %d enabled, flags %d, entry %s, %s\n",
2455           group, group->refcount, group->num_elements, group->num_enabled,
2456           group->flags,
2457           (group->entry ? GST_ELEMENT_NAME (group->entry) : "(none)"),
2458           (group->type ==
2459               GST_OPT_SCHEDULER_GROUP_GET ? "get-based" : "loop-based"));
2460
2461       while (elements) {
2462         GstElement *element = (GstElement *) elements->data;
2463
2464         elements = g_slist_next (elements);
2465
2466         g_print ("  +- element %s\n", GST_ELEMENT_NAME (element));
2467       }
2468       while (group_links) {
2469         GstOptSchedulerGroupLink *link =
2470             (GstOptSchedulerGroupLink *) group_links->data;
2471
2472         group_links = g_slist_next (group_links);
2473
2474         g_print ("group link %p between %p and %p, count %d\n",
2475             link, link->src, link->sink, link->count);
2476       }
2477     }
2478   }
2479 }
2480
2481 static void
2482 gst_opt_scheduler_get_property (GObject * object, guint prop_id,
2483     GValue * value, GParamSpec * pspec)
2484 {
2485   GstOptScheduler *osched;
2486
2487   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2488
2489   osched = GST_OPT_SCHEDULER (object);
2490
2491   switch (prop_id) {
2492     case ARG_ITERATIONS:
2493       g_value_set_int (value, osched->iterations);
2494       break;
2495     case ARG_MAX_RECURSION:
2496       g_value_set_int (value, osched->max_recursion);
2497       break;
2498     default:
2499       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2500       break;
2501   }
2502 }
2503
2504 static void
2505 gst_opt_scheduler_set_property (GObject * object, guint prop_id,
2506     const GValue * value, GParamSpec * pspec)
2507 {
2508   GstOptScheduler *osched;
2509
2510   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
2511
2512   osched = GST_OPT_SCHEDULER (object);
2513
2514   switch (prop_id) {
2515     case ARG_ITERATIONS:
2516       osched->iterations = g_value_get_int (value);
2517       break;
2518     case ARG_MAX_RECURSION:
2519       osched->max_recursion = g_value_get_int (value);
2520       break;
2521     default:
2522       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2523       break;
2524   }
2525 }