4a7ddc54bc10a00247921891d299f6a350a507ad
[platform/upstream/gstreamer.git] / gst / schedulers / gstoptimalscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /*#define GST_DEBUG_ENABLED */
24 #include <gst/gst.h>
25
26 #ifdef USE_COTHREADS
27 # include "cothreads_compat.h"
28 #else
29 # define COTHREADS_NAME_CAPITAL ""
30 # define COTHREADS_NAME         ""
31 #endif
32
33 #define GST_ELEMENT_SCHED_CONTEXT(elem)         ((GstOptSchedulerCtx*) (GST_ELEMENT_CAST (elem)->sched_private))
34 #define GST_ELEMENT_SCHED_GROUP(elem)           (GST_ELEMENT_SCHED_CONTEXT (elem)->group)
35 #define GST_PAD_BUFLIST(pad)                    ((GList*) (GST_REAL_PAD_CAST(pad)->sched_private))
36
37 #define GST_ELEMENT_COTHREAD_STOPPING                   GST_ELEMENT_SCHEDULER_PRIVATE1
38 #define GST_ELEMENT_IS_COTHREAD_STOPPING(element)       GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
39 #define GST_ELEMENT_INTERRUPTED                         GST_ELEMENT_SCHEDULER_PRIVATE2
40 #define GST_ELEMENT_IS_INTERRUPTED(element)             GST_FLAG_IS_SET((element), GST_ELEMENT_INTERRUPTED)
41
42 typedef struct _GstOptScheduler GstOptScheduler;
43 typedef struct _GstOptSchedulerClass GstOptSchedulerClass;
44
45 #define GST_TYPE_OPT_SCHEDULER \
46   (gst_opt_scheduler_get_type())
47 #define GST_OPT_SCHEDULER(obj) \
48   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_OPT_SCHEDULER,GstOptScheduler))
49 #define GST_OPT_SCHEDULER_CLASS(klass) \
50   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_OPT_SCHEDULER,GstOptSchedulerClass))
51 #define GST_IS_OPT_SCHEDULER(obj) \
52   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OPT_SCHEDULER))
53 #define GST_IS_OPT_SCHEDULER_CLASS(obj) \
54   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OPT_SCHEDULER))
55
56 #define GST_OPT_SCHEDULER_CAST(sched)   ((GstOptScheduler *)(sched))
57
58 typedef enum {
59   GST_OPT_SCHEDULER_STATE_NONE,
60   GST_OPT_SCHEDULER_STATE_STOPPED,
61   GST_OPT_SCHEDULER_STATE_ERROR,
62   GST_OPT_SCHEDULER_STATE_RUNNING,
63   GST_OPT_SCHEDULER_STATE_INTERRUPTED
64 } GstOptSchedulerState;
65
66 struct _GstOptScheduler {
67   GstScheduler           parent;
68
69   GstOptSchedulerState   state;
70
71 #ifdef USE_COTHREADS
72   cothread_context      *context;
73 #endif
74   gint                   iterations;
75
76   GSList                *elements;
77   GSList                *chains;
78
79   GList                 *runqueue;
80   gint                   recursion;
81
82   gint                   max_recursion;
83 };
84
85 struct _GstOptSchedulerClass {
86   GstSchedulerClass parent_class;
87 };
88
89 static GType _gst_opt_scheduler_type = 0;
90
91 typedef enum {
92   GST_OPT_SCHEDULER_CHAIN_DIRTY                 = (1 << 1),     
93   GST_OPT_SCHEDULER_CHAIN_DISABLED              = (1 << 2),
94   GST_OPT_SCHEDULER_CHAIN_RUNNING               = (1 << 3),
95 } GstOptSchedulerChainFlags;
96
97 #define GST_OPT_SCHEDULER_CHAIN_DISABLE(chain)          ((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DISABLED)
98 #define GST_OPT_SCHEDULER_CHAIN_ENABLE(chain)           ((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DISABLED)
99 #define GST_OPT_SCHEDULER_CHAIN_IS_DISABLED(chain)      ((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DISABLED)
100
101 typedef struct _GstOptSchedulerChain GstOptSchedulerChain;
102
103 struct _GstOptSchedulerChain {
104   GstOptScheduler               *sched;
105
106   GstOptSchedulerChainFlags      flags;
107   
108   GSList                        *groups;                        /* the groups in this chain */
109   gint                           num_groups;
110   gint                           num_enabled;
111 };
112
113 /* 
114  * elements that are scheduled in one cothread 
115  */
116 typedef enum {
117   GST_OPT_SCHEDULER_GROUP_DIRTY                 = (1 << 1),     /* this group has been modified */
118   GST_OPT_SCHEDULER_GROUP_COTHREAD_STOPPING     = (1 << 2),     /* the group's cothread stops after one iteration */
119   GST_OPT_SCHEDULER_GROUP_DISABLED              = (1 << 3),     /* this group is disabled */
120   GST_OPT_SCHEDULER_GROUP_RUNNING               = (1 << 4),     /* this group is running */
121   GST_OPT_SCHEDULER_GROUP_SCHEDULABLE           = (1 << 5),     /* this group is schedulable */
122 } GstOptSchedulerGroupFlags;
123
124 typedef enum {
125   GST_OPT_SCHEDULER_GROUP_GET                   = 1,
126   GST_OPT_SCHEDULER_GROUP_LOOP                  = 2,
127 } GstOptSchedulerGroupType;
128
129 #define GST_OPT_SCHEDULER_GROUP_SET_FLAG(group,flag)    ((group)->flags |= (flag))
130 #define GST_OPT_SCHEDULER_GROUP_UNSET_FLAG(group,flag)  ((group)->flags &= (flag))
131 #define GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET(group,flag) ((group)->flags & (flag))
132
133 #define GST_OPT_SCHEDULER_GROUP_DISABLE(group)          ((group)->flags |= GST_OPT_SCHEDULER_GROUP_DISABLED)
134 #define GST_OPT_SCHEDULER_GROUP_ENABLE(group)           ((group)->flags &= ~GST_OPT_SCHEDULER_GROUP_DISABLED)
135 #define GST_OPT_SCHEDULER_GROUP_IS_ENABLED(group)       (!((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED))
136 #define GST_OPT_SCHEDULER_GROUP_IS_DISABLED(group)      ((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED)
137
138 typedef struct _GstOptSchedulerGroup GstOptSchedulerGroup;
139
140 typedef int (*GroupScheduleFunction)    (int argc, char *argv[]);
141
142 struct _GstOptSchedulerGroup {
143   GstOptSchedulerChain          *chain;                 /* the chain this group belongs to */
144   GstOptSchedulerGroupFlags      flags;                 /* flags for this group */
145   GstOptSchedulerGroupType       type;                  /* flags for this group */
146
147   GSList                        *elements;              /* elements of this group */
148   gint                           num_elements;
149   gint                           num_enabled;
150   GstElement                    *entry;                 /* the group's entry point */
151
152   GSList                        *providers;             /* other groups that provide data
153                                                            for this group */
154
155 #ifdef USE_COTHREADS
156   cothread                      *cothread;              /* the cothread of this group */
157 #endif
158   GroupScheduleFunction          schedulefunc;
159   int                            argc;
160   char                         **argv;
161 };
162
163 /* 
164  * Scheduler private data for an element 
165  */
166 typedef struct _GstOptSchedulerCtx GstOptSchedulerCtx;
167
168 typedef enum {
169   GST_OPT_SCHEDULER_CTX_DISABLED                = (1 << 1),     /* the element is disabled */
170 } GstOptSchedulerCtxFlags;
171
172 struct _GstOptSchedulerCtx {
173   GstOptSchedulerGroup *group;                          /* the group this element belongs to */
174
175   GstOptSchedulerCtxFlags flags;                        /* flags for this element */
176 };
177
178 enum
179 {
180   ARG_0,
181   ARG_ITERATIONS,
182   ARG_MAX_RECURSION,
183 };
184  
185
186 static void             gst_opt_scheduler_class_init            (GstOptSchedulerClass *klass);
187 static void             gst_opt_scheduler_init                  (GstOptScheduler *scheduler);
188
189 static void             gst_opt_scheduler_set_property          (GObject *object, guint prop_id,
190                                                                  const GValue *value, GParamSpec *pspec);
191 static void             gst_opt_scheduler_get_property          (GObject *object, guint prop_id,
192                                                                  GValue *value, GParamSpec *pspec);
193
194 static void             gst_opt_scheduler_dispose               (GObject *object);
195
196 static void             gst_opt_scheduler_setup                 (GstScheduler *sched);
197 static void             gst_opt_scheduler_reset                 (GstScheduler *sched);
198 static void             gst_opt_scheduler_add_element           (GstScheduler *sched, GstElement *element);
199 static void             gst_opt_scheduler_remove_element        (GstScheduler *sched, GstElement *element);
200 static GstElementStateReturn  
201                         gst_opt_scheduler_state_transition      (GstScheduler *sched, GstElement *element, gint transition);
202 static void             gst_opt_scheduler_scheduling_change     (GstScheduler *sched, GstElement *element);
203 static void             gst_opt_scheduler_lock_element          (GstScheduler *sched, GstElement *element);
204 static void             gst_opt_scheduler_unlock_element        (GstScheduler *sched, GstElement *element);
205 static gboolean         gst_opt_scheduler_yield                 (GstScheduler *sched, GstElement *element);
206 static gboolean         gst_opt_scheduler_interrupt             (GstScheduler *sched, GstElement *element);
207 static void             gst_opt_scheduler_error                 (GstScheduler *sched, GstElement *element);
208 static void             gst_opt_scheduler_pad_link              (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
209 static void             gst_opt_scheduler_pad_unlink    (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
210 static GstPad*          gst_opt_scheduler_pad_select            (GstScheduler *sched, GList *padlist);
211 static GstClockReturn   gst_opt_scheduler_clock_wait            (GstScheduler *sched, GstElement *element,
212                                                                  GstClockID id, GstClockTimeDiff *jitter);
213 static GstSchedulerState
214                         gst_opt_scheduler_iterate               (GstScheduler *sched);
215
216 static void             gst_opt_scheduler_show                  (GstScheduler *sched);
217
218 static GstSchedulerClass *parent_class = NULL;
219
220 static GType
221 gst_opt_scheduler_get_type (void)
222 {
223   if (!_gst_opt_scheduler_type) {
224     static const GTypeInfo scheduler_info = {
225       sizeof (GstOptSchedulerClass),
226       NULL,
227       NULL,
228       (GClassInitFunc) gst_opt_scheduler_class_init,
229       NULL,
230       NULL,
231       sizeof (GstOptScheduler),
232       0,
233       (GInstanceInitFunc) gst_opt_scheduler_init,
234       NULL
235     };
236
237     _gst_opt_scheduler_type = g_type_register_static (GST_TYPE_SCHEDULER, 
238                     "GstOpt"COTHREADS_NAME_CAPITAL"Scheduler", &scheduler_info, 0);
239   }
240   return _gst_opt_scheduler_type;
241 }
242
243 static void
244 gst_opt_scheduler_class_init (GstOptSchedulerClass *klass)
245 {
246   GObjectClass *gobject_class;
247   GstObjectClass *gstobject_class;
248   GstSchedulerClass *gstscheduler_class;
249
250   gobject_class = (GObjectClass*)klass;
251   gstobject_class = (GstObjectClass*)klass;
252   gstscheduler_class = (GstSchedulerClass*)klass;
253
254   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
255
256   gobject_class->set_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_set_property);
257   gobject_class->get_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property);
258   gobject_class->dispose        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose);
259
260   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ITERATIONS,
261     g_param_spec_int ("iterations", "Iterations", 
262                       "Number of groups to schedule in one iteration (-1 == until EOS/error)",
263                       -1, G_MAXINT, 1, G_PARAM_READWRITE));
264 #ifndef USE_COTHREADS
265   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_RECURSION,
266     g_param_spec_int ("max_recursion", "Max recursion", 
267                       "Maximum number of recursions",
268                       1, G_MAXINT, 100, G_PARAM_READWRITE));
269 #endif
270
271   gstscheduler_class->setup             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_setup);
272   gstscheduler_class->reset             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_reset);
273   gstscheduler_class->add_element       = GST_DEBUG_FUNCPTR (gst_opt_scheduler_add_element);
274   gstscheduler_class->remove_element    = GST_DEBUG_FUNCPTR (gst_opt_scheduler_remove_element);
275   gstscheduler_class->state_transition  = GST_DEBUG_FUNCPTR (gst_opt_scheduler_state_transition);
276   gstscheduler_class->scheduling_change = GST_DEBUG_FUNCPTR (gst_opt_scheduler_scheduling_change);
277   gstscheduler_class->lock_element      = GST_DEBUG_FUNCPTR (gst_opt_scheduler_lock_element);
278   gstscheduler_class->unlock_element    = GST_DEBUG_FUNCPTR (gst_opt_scheduler_unlock_element);
279   gstscheduler_class->yield             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_yield);
280   gstscheduler_class->interrupt         = GST_DEBUG_FUNCPTR (gst_opt_scheduler_interrupt);
281   gstscheduler_class->error             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_error);
282   gstscheduler_class->pad_link  = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_link);
283   gstscheduler_class->pad_unlink        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_unlink);
284   gstscheduler_class->pad_select        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_select);
285   gstscheduler_class->clock_wait        = GST_DEBUG_FUNCPTR (gst_opt_scheduler_clock_wait);
286   gstscheduler_class->iterate           = GST_DEBUG_FUNCPTR (gst_opt_scheduler_iterate);
287   gstscheduler_class->show              = GST_DEBUG_FUNCPTR (gst_opt_scheduler_show);
288 }
289
290 static void
291 gst_opt_scheduler_init (GstOptScheduler *scheduler)
292 {
293   scheduler->elements = NULL;
294   scheduler->iterations = 1;
295   scheduler->max_recursion = 100;
296 }
297
298 static void
299 gst_opt_scheduler_dispose (GObject *object)
300 {
301   G_OBJECT_CLASS (parent_class)->dispose (object);
302 }
303
304 static gboolean
305 plugin_init (GModule *module, GstPlugin *plugin)
306 {
307   GstSchedulerFactory *factory;
308
309   gst_plugin_set_longname (plugin, "An optimal scheduler");
310
311   factory = gst_scheduler_factory_new ("opt"COTHREADS_NAME,
312                                        "An optimal scheduler using "COTHREADS_NAME" cothreads",
313                                       gst_opt_scheduler_get_type());
314
315   if (factory != NULL) {
316     gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
317   }
318   else {
319     g_warning ("could not register scheduler: optimal");
320   }
321   return TRUE;
322 }
323
324 GstPluginDesc plugin_desc = {
325   GST_VERSION_MAJOR,
326   GST_VERSION_MINOR,
327   "gstopt"COTHREADS_NAME"scheduler",
328   plugin_init
329 };
330
331
332 static void
333 delete_chain (GstOptSchedulerChain *chain)
334 {
335   GSList *groups;
336   GstOptScheduler *osched;
337   
338   GST_INFO (GST_CAT_SCHEDULING, "delete chain %p", chain);
339
340   osched = chain->sched;
341
342   osched->chains = g_slist_remove (osched->chains, chain);
343
344   groups = chain->groups;
345   while (groups) {
346     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
347
348     /* clear all group's chain pointers, so they can never reference us again */
349     if (group->chain == chain)
350       group->chain = NULL;
351     
352     groups = g_slist_next (groups);
353   }
354
355   g_slist_free (chain->groups);
356   g_free (chain);
357 }
358
359 static GstOptSchedulerChain*
360 add_to_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
361 {
362   GST_INFO (GST_CAT_SCHEDULING, "adding group %p to chain %p", group, chain);
363
364   g_assert (group->chain == NULL);
365
366   chain->groups = g_slist_prepend (chain->groups, group);
367   group->chain = chain;
368   chain->num_groups++;
369
370   return chain;
371 }
372
373 static GstOptSchedulerChain*
374 create_chain (GstOptScheduler *osched)
375 {
376   GstOptSchedulerChain *chain;
377
378   chain = g_new0 (GstOptSchedulerChain, 1);
379   chain->sched = osched;
380
381   osched->chains = g_slist_prepend (osched->chains, chain);
382
383   GST_INFO (GST_CAT_SCHEDULING, "new chain %p", chain);
384
385   return chain;
386 }
387
388 static GstOptSchedulerChain*
389 remove_from_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
390 {
391   GST_INFO (GST_CAT_SCHEDULING, "removing group %p from chain %p", group, chain);
392
393   if (!chain)
394     return NULL;
395
396   g_assert (group);
397   g_assert (group->chain == chain);
398
399   chain->groups = g_slist_remove (chain->groups, group);
400   chain->num_groups--;
401
402   group->chain = NULL;
403
404   if (chain->num_groups == 0) {
405     GST_INFO (GST_CAT_SCHEDULING, "chain %p is empty, removing", chain);
406     delete_chain (chain);
407
408     return NULL;
409   }
410
411   return chain;
412 }
413
414 static GstOptSchedulerChain*
415 merge_chains (GstOptSchedulerChain *chain1, GstOptSchedulerChain *chain2)
416 {
417   GSList *walk;
418
419   g_assert (chain1 != NULL);
420   
421   GST_INFO (GST_CAT_SCHEDULING, "merging chain %p and %p", chain1, chain2);
422   
423   if (chain1 == chain2 || chain2 == NULL)
424     return chain1;
425
426   walk = chain2->groups;
427   while (walk) {
428     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
429     walk = g_slist_next (walk);
430
431     group->chain = NULL;
432     add_to_chain (chain1, group);
433   }
434   delete_chain (chain2);
435
436   return chain1;
437 }
438
439 static void
440 chain_group_set_enabled (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group, gboolean enabled)
441 {
442   g_assert (chain != NULL);
443   g_assert (group != NULL);
444
445   GST_INFO (GST_CAT_SCHEDULING, "request to %d group %p in chain %p, have %d groups enabled out of %d", 
446                   enabled, group, chain, chain->num_enabled, chain->num_groups);
447
448   if (enabled)
449     GST_OPT_SCHEDULER_GROUP_ENABLE (group);
450   else 
451     GST_OPT_SCHEDULER_GROUP_DISABLE (group);
452
453   if (enabled) {
454     if (chain->num_enabled < chain->num_groups)
455       chain->num_enabled++;
456
457     GST_INFO (GST_CAT_SCHEDULING, "enable group %p in chain %p, now %d groups enabled out of %d", group, chain,
458                     chain->num_enabled, chain->num_groups);
459
460     if (chain->num_enabled == chain->num_groups) {
461       GST_INFO (GST_CAT_SCHEDULING, "enable chain %p", chain);
462       GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
463     }
464   }
465   else {
466     if (chain->num_enabled > 0)
467       chain->num_enabled--;
468
469     GST_INFO (GST_CAT_SCHEDULING, "disable group %p in chain %p, now %d groups enabled out of %d", group, chain,
470                     chain->num_enabled, chain->num_groups);
471
472     if (chain->num_enabled == 0) {
473       GST_INFO (GST_CAT_SCHEDULING, "disable chain %p", chain);
474       GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
475     }
476   }
477 }
478
479 static GstOptSchedulerGroup*
480 add_to_group (GstOptSchedulerGroup *group, GstElement *element)
481 {
482   g_assert (group != NULL);
483   g_assert (element != NULL);
484
485   GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to group %p", GST_ELEMENT_NAME (element), group);
486
487   if (GST_ELEMENT_IS_DECOUPLED (element)) {
488     GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" is decoupled, not adding to group %p", GST_ELEMENT_NAME (element), group);
489     return group;
490   }
491
492   g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);
493
494   group->elements = g_slist_prepend (group->elements, element);
495   group->num_elements++;
496
497   GST_ELEMENT_SCHED_GROUP (element) = group;
498
499   return group;
500 }
501
502 static GstOptSchedulerGroup*
503 create_group (GstOptSchedulerChain *chain, GstElement *element)
504 {
505   GstOptSchedulerGroup *group;
506
507   group = g_new0 (GstOptSchedulerGroup, 1);
508   GST_INFO (GST_CAT_SCHEDULING, "new group %p", group);
509
510   add_to_group (group, element);
511   add_to_chain (chain, group);
512   
513   return group;
514 }
515
516 static void 
517 destroy_group_scheduler (GstOptSchedulerGroup *group) 
518 {
519   g_assert (group);
520
521   if (group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)
522     g_warning ("removing running element");
523
524 #ifdef USE_COTHREADS
525   if (group->cothread) {
526     do_cothread_destroy (group->cothread);
527     group->cothread = NULL;
528   }
529 #else
530   group->schedulefunc = NULL;
531   group->argc = 0;
532   group->argv = NULL;
533 #endif
534
535   group->flags &= ~GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
536 }
537
538 static void
539 delete_group (GstOptSchedulerGroup *group)
540 {
541   GSList *elements;
542   
543   GST_INFO (GST_CAT_SCHEDULING, "delete group %p", group);
544
545   g_assert (group != NULL);
546   g_assert (group->chain == NULL);
547
548   if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
549     destroy_group_scheduler (group);
550
551   /* remove all elements from the group */
552   elements = group->elements;
553   while (elements) {
554     GstElement *element = GST_ELEMENT (elements->data);
555
556     GST_ELEMENT_SCHED_GROUP (element) = NULL;
557
558     elements = g_slist_next (elements);
559   }
560
561   g_slist_free (group->elements);
562   g_free (group);
563 }
564
565 static GstOptSchedulerGroup*
566 remove_from_group (GstOptSchedulerGroup *group, GstElement *element)
567 {
568   GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from group %p", GST_ELEMENT_NAME (element), group);
569
570   g_assert (group != NULL);
571   g_assert (element != NULL);
572
573   group->elements = g_slist_remove (group->elements, element);
574   group->num_elements--;
575
576   GST_ELEMENT_SCHED_GROUP (element) = NULL;
577
578   if (group->num_elements == 0) {
579     GST_INFO (GST_CAT_SCHEDULING, "group %p is empty, deleting", group);
580     remove_from_chain (group->chain, group);
581     delete_group (group);
582     return NULL;
583   }
584   return group;
585 }
586
587 static GstOptSchedulerGroup*
588 merge_groups (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
589 {
590   g_assert (group1 != NULL);
591
592   GST_INFO (GST_CAT_SCHEDULING, "merging groups %p and %p", group1, group2);
593   
594   if (group1 == group2 || group2 == NULL)
595     return group1;
596
597   while (group2) {
598     GstElement *element = (GstElement *)group2->elements->data;
599    
600     group2 = remove_from_group (group2, element);
601     add_to_group (group1, element);
602   }
603   
604   return group1;
605 }
606
607 static void
608 group_error_handler (GstOptSchedulerGroup *group) 
609 {
610   chain_group_set_enabled (group->chain, group, FALSE);
611   group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
612 }
613
614 /* this function enables/disables an element, it will set/clear a flag on the element 
615  * and tells the chain that the group is enabled if all elements inside the group are
616  * enabled */
617 static void
618 group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gboolean enabled)
619 {
620   g_assert (group != NULL);
621   g_assert (element != NULL);
622
623   GST_INFO (GST_CAT_SCHEDULING, "request to %d element %s in group %p, have %d elements enabled out of %d", 
624                     enabled, GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
625
626   if (enabled) {
627     if (group->num_enabled < group->num_elements)
628       group->num_enabled++;
629
630     GST_INFO (GST_CAT_SCHEDULING, "enable element %s in group %p, now %d elements enabled out of %d", 
631                     GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
632
633     if (group->num_enabled == group->num_elements) {
634       GST_INFO (GST_CAT_SCHEDULING, "enable group %p", group);
635       chain_group_set_enabled (group->chain, group, TRUE);
636     }
637   }
638   else {
639     if (group->num_enabled > 0)
640       group->num_enabled--;
641
642     GST_INFO (GST_CAT_SCHEDULING, "disable element %s in group %p, now %d elements enabled out of %d", 
643                     GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
644
645     if (group->num_enabled == 0) {
646       GST_INFO (GST_CAT_SCHEDULING, "disable group %p", group);
647       chain_group_set_enabled (group->chain, group, FALSE);
648     }
649   }
650 }
651
652 /* a group is scheduled by doing a cothread switch to it or
653  * by calling the schedule function. In the non-cothread case
654  * we cannot run already running groups so we return FALSE here
655  * to indicate this to the caller */
656 static gboolean 
657 schedule_group (GstOptSchedulerGroup *group) 
658 {
659   if (!group->entry)
660     return FALSE;
661
662 #ifdef USE_COTHREADS
663   if (group->cothread)
664     do_cothread_switch (group->cothread);
665   else
666     g_warning ("(internal error): trying to schedule group without cothread");
667   return TRUE;
668 #else
669   group->schedulefunc (group->argc, group->argv);
670   return TRUE;
671 #endif
672 }
673
674 #ifndef USE_COTHREADS
675 static void
676 gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
677 {
678   GST_INFO (GST_CAT_SCHEDULING, "entering scheduler run queue recursion %d", osched->recursion);
679
680   /* make sure we don't exceed max_recursion */
681   if (osched->recursion > osched->max_recursion) {
682     osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
683     return;
684   }
685
686   osched->recursion++;
687
688   while (osched->runqueue) {
689     GstOptSchedulerGroup *group;
690     
691     group = (GstOptSchedulerGroup *) osched->runqueue->data;
692     osched->runqueue = g_list_remove (osched->runqueue, group);
693
694     GST_INFO (GST_CAT_SCHEDULING, "scheduling %p", group);
695
696     if (!schedule_group (group)) {
697       g_warning  ("error scheduling group %p", group);
698       group_error_handler (group);
699     }
700
701     GST_INFO (GST_CAT_SCHEDULING, "done scheduling %p", group);
702   }
703
704   GST_INFO (GST_CAT_SCHEDULING, "run queue length after scheduling %d", g_list_length (osched->runqueue));
705
706   osched->recursion--;
707 }
708 #endif
709
710 /* a chain is scheduled by picking the first active group and scheduling it */
711 static void 
712 schedule_chain (GstOptSchedulerChain *chain) 
713 {
714   GSList *groups;
715   GstOptScheduler *osched;
716
717   osched = chain->sched;
718   groups = chain->groups;
719
720   while (groups) {
721     GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
722
723     groups = g_slist_next (groups);
724
725     if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
726
727       GST_INFO (GST_CAT_SCHEDULING, "scheduling group %p in chain %p", 
728                 group, chain);
729
730 #ifdef USE_COTHREADS
731       schedule_group (group);
732 #else
733       osched->recursion = 0;
734       osched->runqueue = g_list_append (osched->runqueue, group);
735       gst_opt_scheduler_schedule_run_queue (osched);
736 #endif
737
738       GST_INFO (GST_CAT_SCHEDULING, "done scheduling group %p in chain %p", 
739                 group, chain);
740       break;
741     }
742   }
743 }
744
745 /* a get-based group is scheduled by getting a buffer from the get based
746  * entry point and by pushing the buffer to the peer.
747  * We also set the running flag on this group for as long as this
748  * function is running. */
749 static int
750 get_group_schedule_function (int argc, char *argv[])
751 {
752   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
753   const GList *pads = gst_element_get_pad_list (group->entry);
754
755   GST_INFO (GST_CAT_SCHEDULING, "get wrapper of group %p", group);
756
757   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
758
759   while (pads) {
760     GstBuffer *buffer;
761     GstPad *pad = GST_PAD_CAST (pads->data);
762     pads = g_list_next (pads);
763
764     /* skip sinks and ghostpads */
765     if (!GST_PAD_IS_SRC (pad) || !GST_IS_REAL_PAD (pad))
766       continue;
767
768     GST_INFO (GST_CAT_SCHEDULING, "doing get and push on pad \"%s:%s\" in group %p", 
769               GST_DEBUG_PAD_NAME (pad), group);
770
771     buffer = GST_RPAD_GETFUNC (pad) (pad);
772     if (buffer) {
773       if (GST_EVENT_IS_INTERRUPT (buffer)) {
774         gst_event_unref (GST_EVENT (buffer));
775         break;
776       }
777       gst_pad_push (pad, buffer);
778     }
779   }
780
781   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
782
783   return 0;
784 }
785
786 /* a loop-based group is scheduled by calling the loop function
787  * on the entry point. 
788  * We also set the running flag on this group for as long as this
789  * function is running. */
790 static int
791 loop_group_schedule_function (int argc, char *argv[])
792 {
793   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
794   GstElement *entry = group->entry;
795
796   GST_INFO (GST_CAT_SCHEDULING, "loop wrapper of group %p", group);
797
798   group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
799
800   GST_INFO (GST_CAT_SCHEDULING, "calling loopfunc of element %s in group %p", 
801             GST_ELEMENT_NAME (entry), group);
802
803   entry->loopfunc (entry);
804
805   group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
806
807   return 0;
808
809 }
810
811 /* the function to schedule an unkown group, which just gives an error */
812 static int
813 unkown_group_schedule_function (int argc, char *argv[])
814 {
815   GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
816
817   g_warning ("(internal error) unkown group type %d, disabling\n", group->type);
818   group_error_handler (group);
819
820   return 0;
821 }
822
823 /* this function is called when the first element of a chain-loop or a loop-loop
824  * link performs a push to the loop element. We then schedule the
825  * group with the loop-based element until the bufpen is empty */
826 static void
827 gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstBuffer *buffer)
828 {
829   GstOptSchedulerGroup *group;
830   GstOptScheduler *osched;
831
832   GST_INFO (GST_CAT_SCHEDULING, "loop wrapper, putting buffer in bufpen");
833
834   group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
835   osched = group->chain->sched;
836
837
838 #ifdef USE_COTHREADS
839   if (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))) {
840     g_warning ("deadlock detected, disabling group %p", group);
841     group_error_handler (group);
842   }
843   else {
844     GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), buffer);
845     schedule_group (group);
846   }
847 #else
848   GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), buffer);
849   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
850     osched->runqueue = g_list_append (osched->runqueue, group);
851   }
852 #endif
853   
854   GST_INFO (GST_CAT_SCHEDULING, "after loop wrapper buflist %d", 
855             g_list_length (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))));
856 }
857
858 /* this function is called by a loop based element that performs a
859  * pull on a sinkpad. We schedule the peer group until the bufpen
860  * is filled with the buffer so that this function  can return */
861 static GstBuffer*
862 gst_opt_scheduler_get_wrapper (GstPad *srcpad)
863 {
864   GstBuffer *buffer = NULL;
865
866   GST_INFO (GST_CAT_SCHEDULING, "get wrapper, removing buffer from bufpen");
867
868   if (GST_PAD_BUFLIST (srcpad))
869     buffer = GST_PAD_BUFLIST (srcpad)->data;
870
871   while (!buffer) {
872     GstOptSchedulerGroup *group;
873     GstOptScheduler *osched;
874     
875     group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad));
876     osched = group->chain->sched;
877
878 #ifdef USE_COTHREADS
879     schedule_group (group);
880 #else
881     if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
882       osched->runqueue = g_list_append (osched->runqueue, group);
883       gst_opt_scheduler_schedule_run_queue (osched);
884     }
885     else {
886       g_warning ("deadlock detected, disabling group %p", group);
887       group_error_handler (group);
888       return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
889     }
890 #endif
891     /* if the scheduler interrupted, make sure we send an INTERRUPTED event to the
892      * >        * loop based element */
893     if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
894       return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
895     }
896     
897     if (GST_PAD_BUFLIST (srcpad)) {
898       buffer = (GstBuffer *) GST_PAD_BUFLIST (srcpad)->data;
899     }
900   }
901   GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), buffer);
902
903   GST_INFO (GST_CAT_SCHEDULING, "get wrapper, returning buffer %d",
904             g_list_length (GST_PAD_BUFLIST (srcpad)));
905
906   return buffer;
907 }
908
909 /* this function is a chain wrapper for non-event-aware plugins,
910  * it'll simply dispatch the events to the (default) event handler */
911 static void
912 gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstBuffer *buffer)
913 {
914   if (GST_IS_EVENT (buffer)) {
915     gst_pad_send_event (sinkpad, GST_EVENT (buffer));
916   }
917   else {
918     GST_RPAD_CHAINFUNC (sinkpad) (sinkpad, buffer);
919   }
920 }
921
922 /* setup the scheduler context for a group. The right schedule function
923  * is selected based on the group type and cothreads are created if 
924  * needed */
925 static void 
926 setup_group_scheduler (GstOptScheduler *osched, GstOptSchedulerGroup *group) 
927 {
928   GroupScheduleFunction wrapper;
929
930   wrapper = unkown_group_schedule_function;
931
932   /* figure out the wrapper function for this group */
933   if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
934     wrapper = get_group_schedule_function;
935   else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
936     wrapper = loop_group_schedule_function;
937         
938 #ifdef USE_COTHREADS
939   if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
940     do_cothread_create (group->cothread, osched->context,
941                       (cothread_func) wrapper, 0, (char **) group);
942   }
943   else {
944     do_cothread_setfunc (group->cothread, osched->context,
945                       (cothread_func) wrapper, 0, (char **) group);
946   }
947 #else
948   group->schedulefunc = wrapper;
949   group->argc = 0;
950   group->argv = (char **) group;
951 #endif
952   group->flags |= GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
953 }
954
955 static GstElementStateReturn
956 gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
957 {
958   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
959   GstOptSchedulerGroup *group;
960   GstElementStateReturn res = GST_STATE_SUCCESS;
961   
962   GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" state change %d", GST_ELEMENT_NAME (element), transition);
963
964   /* we check the state of the managing pipeline here */
965   if (GST_IS_BIN (element)) {
966     if (GST_SCHEDULER_PARENT (sched) == element) {
967       GST_INFO (GST_CAT_SCHEDULING, "parent \"%s\" changed state", GST_ELEMENT_NAME (element));
968
969       switch (transition) {
970         case GST_STATE_PLAYING_TO_PAUSED:
971           GST_INFO (GST_CAT_SCHEDULING, "setting scheduler state to stopped");
972           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED;
973           break;
974         case GST_STATE_PAUSED_TO_PLAYING:
975           GST_INFO (GST_CAT_SCHEDULING, "setting scheduler state to running");
976           GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING;
977           break;
978         default:
979           GST_INFO (GST_CAT_SCHEDULING, "no interesting state change, doing nothing");
980       }
981     }
982     return res;
983   }
984
985   /* we don't care about decoupled elements after this */
986   if (GST_ELEMENT_IS_DECOUPLED (element))
987     return GST_STATE_SUCCESS;
988
989   /* get the group of the element */
990   group = GST_ELEMENT_SCHED_GROUP (element);
991
992   switch (transition) {
993     case GST_STATE_PAUSED_TO_PLAYING:
994       /* an element withut a group has to be an unlinked src, sink
995        * filter element */
996       if (!group)
997         res = GST_STATE_FAILURE;
998       /* else construct the scheduling context of this group and enable it */
999       else {
1000         setup_group_scheduler (osched, group);
1001         group_element_set_enabled (group, element, TRUE);
1002       }
1003       break;
1004     case GST_STATE_PLAYING_TO_PAUSED:
1005       /* if the element still has a group, we disable it */
1006       if (group) 
1007         group_element_set_enabled (group, element, FALSE);
1008       break;
1009     default:
1010       break;
1011   }
1012
1013   return res;
1014 }
1015
1016 static void
1017 gst_opt_scheduler_scheduling_change (GstScheduler *sched, GstElement *element)
1018 {
1019   g_warning ("scheduling change, implement me");
1020 }
1021
1022 static void
1023 get_group (GstElement *element, GstOptSchedulerGroup **group)
1024 {
1025   GstOptSchedulerCtx *ctx;
1026
1027   ctx = GST_ELEMENT_SCHED_CONTEXT (element);
1028   if (ctx) 
1029     *group = ctx->group;
1030   else
1031     *group = NULL;
1032 }
1033
1034 /*
1035  * the idea is to put the two elements into the same group. 
1036  * - When no element is inside a group, we create a new group and add 
1037  *   the elements to it. 
1038  * - When one of the elements has a group, add the other element to 
1039  *   that group
1040  * - if both of the elements have a group, we merge the groups, which
1041  *   will also merge the chains.
1042  */
1043 static GstOptSchedulerGroup*
1044 group_elements (GstOptScheduler *osched, GstElement *element1, GstElement *element2)
1045 {
1046   GstOptSchedulerGroup *group1, *group2, *group = NULL;
1047   
1048   get_group (element1, &group1);
1049   get_group (element2, &group2);
1050   
1051   /* none of the elements is added to a group, create a new group
1052    * and chain to add the elements to */
1053   if (!group1 && !group2) {
1054     GstOptSchedulerChain *chain;
1055
1056     GST_INFO (GST_CAT_SCHEDULING, "creating new group to hold \"%s\" and \"%s\"", 
1057                   GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1058
1059     chain = create_chain (osched);
1060     group = create_group (chain, element1);
1061     add_to_group (group, element2);
1062   }
1063   /* the first element has a group */
1064   else if (group1) {
1065     GST_INFO (GST_CAT_SCHEDULING, "adding \"%s\" to \"%s\"'s group", 
1066                   GST_ELEMENT_NAME (element2), GST_ELEMENT_NAME (element1));
1067
1068     /* the second element also has a group, merge */
1069     if (group2)
1070       merge_groups (group1, group2);
1071     /* the second element has no group, add it to the group
1072      * of the first element */
1073     else
1074       add_to_group (group1, element2);
1075
1076     group = group1;
1077   }
1078   /* element1 has no group, element2 does. Add element1 to the
1079    * group of element2 */
1080   else {
1081     GST_INFO (GST_CAT_SCHEDULING, "adding \"%s\" to \"%s\"'s group", 
1082                   GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
1083     add_to_group (group2, element1);
1084     group = group2;
1085   }
1086   return group;
1087 }
1088
1089 typedef enum {
1090   GST_OPT_INVALID,
1091   GST_OPT_GET_TO_CHAIN,
1092   GST_OPT_LOOP_TO_CHAIN,
1093   GST_OPT_GET_TO_LOOP,
1094   GST_OPT_CHAIN_TO_CHAIN,
1095   GST_OPT_CHAIN_TO_LOOP,
1096   GST_OPT_LOOP_TO_LOOP,
1097 } LinkType;
1098
1099 /*
1100  * Entry points for this scheduler.
1101  */
1102 static void
1103 gst_opt_scheduler_setup (GstScheduler *sched)
1104 {   
1105 #ifdef USE_COTHREADS
1106   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1107               
1108   /* first create thread context */
1109   if (osched->context == NULL) {
1110     GST_DEBUG (GST_CAT_SCHEDULING, "initializing cothread context");
1111     osched->context = do_cothread_context_init ();
1112   }
1113 #endif
1114
1115   
1116 static void 
1117 gst_opt_scheduler_reset (GstScheduler *sched)
1118
1119 #ifdef USE_COTHREADS
1120   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1121   GSList *chains = osched->chains;
1122
1123   while (chains) {
1124     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
1125     GSList *groups = chain->groups;
1126
1127     while (groups) {
1128       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1129
1130       destroy_group_scheduler (group);
1131       groups = groups->next;
1132     }
1133     chains = chains->next;
1134   }
1135               
1136   if (osched->context) {
1137     do_cothread_context_destroy (osched->context);
1138     osched->context = NULL; 
1139   }
1140 #endif
1141 }     
1142 static void
1143 gst_opt_scheduler_add_element (GstScheduler *sched, GstElement *element)
1144 {
1145   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1146   GstOptSchedulerCtx *ctx;
1147
1148   GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to scheduler", GST_ELEMENT_NAME (element));
1149
1150   /* decoupled elements are not added to the scheduler lists */
1151   if (GST_ELEMENT_IS_DECOUPLED (element))
1152     return;
1153
1154   ctx = g_new0 (GstOptSchedulerCtx, 1);
1155   GST_ELEMENT_SCHED_CONTEXT (element) = ctx;
1156
1157   /* loop based elements *always* end up in their own group. It can eventually
1158    * be merged with another group when a link is made */
1159   if (element->loopfunc) {
1160     GstOptSchedulerGroup *group;
1161     GstOptSchedulerChain *chain;
1162
1163     chain = create_chain (osched);
1164
1165     group = create_group (chain, element);
1166     group->entry = element;
1167     group->type = GST_OPT_SCHEDULER_GROUP_LOOP;
1168
1169     GST_INFO (GST_CAT_SCHEDULING, "added element \"%s\" as loop based entry", GST_ELEMENT_NAME (element));
1170   }
1171 }
1172
1173 static void
1174 gst_opt_scheduler_remove_element (GstScheduler *sched, GstElement *element)
1175 {
1176   GstOptSchedulerGroup *group;
1177
1178   GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler", GST_ELEMENT_NAME (element));
1179
1180   /* decoupled elements are not added to the scheduler lists and should therefore
1181    * no be removed */
1182   if (GST_ELEMENT_IS_DECOUPLED (element))
1183     return;
1184
1185   /* the element is guaranteed to live in it's own group/chain now */
1186   get_group (element, &group);
1187   if (group) {
1188     GstOptSchedulerChain *chain;
1189     
1190     GST_ELEMENT_SCHED_GROUP (element) = NULL;
1191
1192     chain = group->chain;
1193     if (chain) {
1194       remove_from_chain (chain, group);
1195     }
1196     delete_group (group);
1197   }
1198
1199   g_free (GST_ELEMENT_SCHED_CONTEXT (element));
1200   GST_ELEMENT_SCHED_CONTEXT (element) = NULL;
1201 }
1202
1203 static void
1204 gst_opt_scheduler_lock_element (GstScheduler *sched, GstElement *element)
1205 {
1206   //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1207   g_warning ("lock element, implement me");
1208 }
1209
1210 static void
1211 gst_opt_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
1212 {
1213   //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1214   g_warning ("unlock element, implement me");
1215 }
1216
1217 static gboolean
1218 gst_opt_scheduler_yield (GstScheduler *sched, GstElement *element)
1219 {
1220 #ifdef USE_COTHREADS
1221   /* yield hands control to the main cothread context if the requesting 
1222    * element is the entry point of the group */
1223   GstOptSchedulerGroup *group;
1224   get_group (element, &group);
1225   if (group && group->entry == element)
1226     do_cothread_switch (do_cothread_get_main (((GstOptScheduler*)sched)->context)); 
1227
1228   return FALSE;
1229 #else
1230   g_warning ("element %s performs a yield, please fix the element", 
1231                   GST_ELEMENT_NAME (element));
1232   return TRUE;
1233 #endif
1234 }
1235
1236 static gboolean
1237 gst_opt_scheduler_interrupt (GstScheduler *sched, GstElement *element)
1238 {
1239   GST_INFO (GST_CAT_SCHEDULING, "interrupt from \"%s\"", 
1240             GST_ELEMENT_NAME (element));
1241
1242 #ifdef USE_COTHREADS
1243   do_cothread_switch (do_cothread_get_main (((GstOptScheduler*)sched)->context)); 
1244   return FALSE;
1245 #else
1246   {
1247     GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1248  
1249     osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
1250   }
1251   return TRUE;
1252 #endif
1253 }
1254
1255 static void
1256 gst_opt_scheduler_error (GstScheduler *sched, GstElement *element)
1257 {
1258   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1259   GstOptSchedulerGroup *group;
1260   get_group (element, &group);
1261   if (group)
1262     group_error_handler (group);
1263
1264   osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
1265 }
1266
1267 /* link pads, merge groups and chains */
1268 static void
1269 gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
1270 {
1271   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1272   LinkType type = GST_OPT_INVALID;
1273   GstElement *element1, *element2;
1274
1275   GST_INFO (GST_CAT_SCHEDULING, "pad link between \"%s:%s\" and \"%s:%s\"", 
1276                   GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1277
1278   element1 = GST_PAD_PARENT (srcpad);
1279   element2 = GST_PAD_PARENT (sinkpad);
1280
1281   /* first we need to figure out what type of link we're dealing
1282    * with */
1283   if (element1->loopfunc && element2->loopfunc)
1284     type = GST_OPT_LOOP_TO_LOOP;
1285   else {
1286     if (element1->loopfunc) {
1287       if (GST_RPAD_CHAINFUNC (sinkpad))
1288         type = GST_OPT_LOOP_TO_CHAIN;
1289     }
1290     else if (element2->loopfunc) {
1291       if (GST_RPAD_GETFUNC (srcpad)) {
1292         type = GST_OPT_GET_TO_LOOP;
1293         /* this could be tricky, the get based source could 
1294          * already be part of a loop based group in another pad,
1295          * we assert on that for now */
1296         if (GST_ELEMENT_SCHED_CONTEXT (element1) != NULL &&
1297             GST_ELEMENT_SCHED_GROUP (element1) != NULL) 
1298         {
1299           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (element1);
1300
1301           /* if the loop based element is the entry point we're ok, if it
1302            * isn't then we have multiple loop based elements in this group */
1303           if (group->entry != element2) {
1304             g_error ("internal error: cannot schedule get to loop in multi-loop based group");
1305             return;
1306           }
1307         }
1308       }
1309       else
1310         type = GST_OPT_CHAIN_TO_LOOP;
1311     }
1312     else {
1313       if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad)) {
1314         type = GST_OPT_GET_TO_CHAIN;
1315         /* the get based source could already be part of a loop 
1316          * based group in another pad, we assert on that for now */
1317         if (GST_ELEMENT_SCHED_CONTEXT (element1) != NULL &&
1318             GST_ELEMENT_SCHED_GROUP (element1) != NULL) 
1319         {
1320           GstOptSchedulerGroup *group = GST_ELEMENT_SCHED_GROUP (element1);
1321
1322           /* if the get based element is the entry point we're ok, if it
1323            * isn't then we have a mixed loop/chain based group */
1324           if (group->entry != element1) {
1325             g_error ("internal error: cannot schedule get to chain with mixed loop/chain based group");
1326             return;
1327           }
1328         }
1329       }
1330       else 
1331         type = GST_OPT_CHAIN_TO_CHAIN;
1332     }
1333   }
1334   
1335   /* for each link type, perform specific actions */
1336   switch (type) {
1337     case GST_OPT_GET_TO_CHAIN:
1338     {
1339       GstOptSchedulerGroup *group = NULL;
1340
1341       GST_INFO (GST_CAT_SCHEDULING, "get to chain based link");
1342
1343       /* setup get/chain handlers */
1344       GST_RPAD_GETHANDLER (srcpad) = GST_RPAD_GETFUNC (srcpad);
1345       if (GST_ELEMENT_IS_EVENT_AWARE (element2))
1346         GST_RPAD_CHAINHANDLER (sinkpad) = GST_RPAD_CHAINFUNC (sinkpad);
1347       else
1348         GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
1349
1350       /* the two elements should be put into the same group, 
1351        * this also means that they are in the same chain automatically */
1352       group = group_elements (osched, element1, element2);
1353
1354       /* if there is not yet an entry in the group, select the source
1355        * element as the entry point */
1356       if (!group->entry) {
1357         group->entry = element1;
1358         group->type = GST_OPT_SCHEDULER_GROUP_GET;
1359
1360         GST_INFO (GST_CAT_SCHEDULING, "setting \"%s\" as entry point of _get-based group %p", 
1361                   GST_ELEMENT_NAME (element1), group);
1362       }
1363       break;
1364     }
1365     case GST_OPT_LOOP_TO_CHAIN:
1366     case GST_OPT_CHAIN_TO_CHAIN:
1367       GST_INFO (GST_CAT_SCHEDULING, "loop/chain to chain based link");
1368
1369       if (GST_ELEMENT_IS_EVENT_AWARE (element2))
1370         GST_RPAD_CHAINHANDLER (sinkpad) = GST_RPAD_CHAINFUNC (sinkpad);
1371       else
1372         GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
1373
1374       /* the two elements should be put into the same group, 
1375        * this also means that they are in the same chain automatically, 
1376        * in case of a loop-based element1, there will be a group for element1 and
1377        * element2 will be added to it. */
1378       group_elements (osched, element1, element2);
1379       break;
1380     case GST_OPT_GET_TO_LOOP:
1381       GST_INFO (GST_CAT_SCHEDULING, "get to loop based link");
1382
1383       GST_RPAD_GETHANDLER (srcpad) = GST_RPAD_GETFUNC (srcpad);
1384
1385       /* the two elements should be put into the same group, 
1386        * this also means that they are in the same chain automatically, 
1387        * element2 is loop-based so it already has a group where element1
1388        * will be added to */
1389       group_elements (osched, element1, element2);
1390       break;
1391     case GST_OPT_CHAIN_TO_LOOP:
1392     case GST_OPT_LOOP_TO_LOOP:
1393     {
1394       GstOptSchedulerGroup *group1, *group2;
1395
1396       GST_INFO (GST_CAT_SCHEDULING, "chain/loop to loop based link");
1397
1398       GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper;
1399       GST_RPAD_GETHANDLER (srcpad) = gst_opt_scheduler_get_wrapper;
1400
1401       group1 = GST_ELEMENT_SCHED_GROUP (element1);
1402       group2 = GST_ELEMENT_SCHED_GROUP (element2);
1403
1404       g_assert (group2 != NULL);
1405
1406       /* group2 is guaranteed to exist as it contains a loop-based element.
1407        * group1 only exists if element1 is linked to some other element */
1408       if (!group1) {
1409         /* create a new group for element1 as it cannot be merged into another group
1410          * here. we create the group in the same chain as the loop-based element. */
1411         GST_INFO (GST_CAT_SCHEDULING, "creating new group for element %s", GST_ELEMENT_NAME (element1));
1412         group1 = create_group (group2->chain, element1);
1413       }
1414       else {
1415         /* both elements are already in a group, make sure they are added to
1416          * the same chain */
1417         merge_chains (group1->chain, group2->chain);
1418       }
1419       break;
1420     }
1421     case GST_OPT_INVALID:
1422       g_error ("(internal error) invalid element link, what are you doing?");
1423       break;
1424   }
1425 }
1426
1427 /* 
1428  * checks if an element is still linked to some other element in the group. 
1429  * no checking is done on the brokenpad arg 
1430  * */
1431 static gboolean
1432 element_has_link_with_group (GstElement *element, GstOptSchedulerGroup *group, GstPad *brokenpad)
1433 {
1434   gboolean linked = FALSE;
1435   const GList *pads;
1436
1437   /* see if the element has no more links to the peer group */
1438   pads = gst_element_get_pad_list (element);
1439   while (pads && !linked) {
1440     GstPad *pad = GST_PAD_CAST (pads->data);
1441     pads = g_list_next (pads);
1442
1443     /* we only operate on real pads and on the pad that is not broken */
1444     if (!GST_IS_REAL_PAD (pad) || pad == brokenpad)
1445       continue;
1446
1447     if (GST_PAD_PEER (pad)) {
1448       GstElement *parent;
1449       GstOptSchedulerGroup *parentgroup;
1450
1451       /* see in what group this element is */
1452       parent = GST_PAD_PARENT (GST_PAD_PEER (pad));
1453
1454       /* links with decoupled elements are valid */
1455       if (GST_ELEMENT_IS_DECOUPLED (parent)) {
1456         linked = TRUE;
1457       }
1458       else {
1459         /* for non-decoupled elements we need to check the group */
1460         get_group (parent, &parentgroup);
1461
1462         /* if it's in the same group, we're still linked */
1463         if (parentgroup == group)
1464           linked = TRUE;
1465       }
1466     } 
1467   }
1468   return linked;
1469 }
1470
1471 static void
1472 gst_opt_scheduler_pad_unlink (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
1473 {
1474   //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1475   GstElement *element1, *element2;
1476   GstOptSchedulerGroup *group1, *group2;
1477
1478   GST_INFO (GST_CAT_SCHEDULING, "pad unlink between \"%s:%s\" and \"%s:%s\"", 
1479                   GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1480
1481   element1 = GST_PAD_PARENT (srcpad);
1482   element2 = GST_PAD_PARENT (sinkpad);
1483   
1484   get_group (element1, &group1);
1485   get_group (element2, &group2);
1486
1487   /* if one the elements has no group (anymore) we don't really care 
1488    * about the link */
1489   if (!group1 || !group2) {
1490     GST_INFO (GST_CAT_SCHEDULING, "one (or both) of the elements is not in a group, not interesting");
1491     return;
1492   }
1493
1494   /* easy part, groups are different */
1495   if (group1 != group2) {
1496     GST_INFO (GST_CAT_SCHEDULING, "elements are in different groups");
1497
1498     /* FIXME, need to eventually break the chain */
1499     g_warning ("pad unlink for different groups, implement me");
1500   }
1501   /* hard part, groups are equal */
1502   else {
1503     gboolean still_link1, still_link2;
1504     GstOptSchedulerGroup *group;
1505     
1506     /* since group1 == group2, it doesn't matter which group we take */
1507     group = group1;
1508
1509     GST_INFO (GST_CAT_SCHEDULING, "elements are in the same group %p", group);
1510
1511     /* check if the element is still linked to some other element in the group,
1512      * we pass the pad that is broken up as an arg because a link on that pad
1513      * is not valid anymore */
1514     still_link1 = element_has_link_with_group (element1, group, srcpad);
1515     still_link2 = element_has_link_with_group (element2, group, sinkpad);
1516
1517     /* if there is still a link, we don't need to break this group */
1518     if (still_link1 && still_link2) {
1519       GST_INFO (GST_CAT_SCHEDULING, "elements still have links with other elements in the group");
1520
1521       /* FIXME, need to check for breaking up the group */
1522       return;
1523     }
1524
1525     /* now check which one of the elements we can remove from the group */
1526     if (!still_link1) {
1527       GST_INFO (GST_CAT_SCHEDULING, "element1 is separated from the group");
1528       /* see if the element was an entry point for the group */
1529       if (group->entry == element1) {
1530         /* we're going to remove the element so we need to clear it as the
1531          * entry point */
1532         group->entry = NULL;
1533       }
1534       remove_from_group (group, element1);
1535     }
1536     if (!still_link2) {
1537       GST_INFO (GST_CAT_SCHEDULING, "element2 is separated from the group");
1538
1539       /* see if the element was an entry point for the group */
1540       if (group->entry == element2) {
1541         /* we're going to remove the element so we need to clear it as the
1542          * entry point */
1543         group->entry = NULL;
1544       }
1545       remove_from_group (group, element2);
1546     }
1547   }
1548 }
1549
1550 static GstPad*
1551 gst_opt_scheduler_pad_select (GstScheduler *sched, GList *padlist)
1552 {
1553   //GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1554   
1555   g_warning ("pad select, implement me");
1556
1557   return NULL;
1558 }
1559
1560 static GstClockReturn
1561 gst_opt_scheduler_clock_wait (GstScheduler *sched, GstElement *element,
1562                               GstClockID id, GstClockTimeDiff *jitter)
1563 {
1564   return gst_clock_id_wait (id, jitter);
1565 }
1566
1567 /* a scheduler iteration is done by looping and scheduling the active chains */
1568 static GstSchedulerState
1569 gst_opt_scheduler_iterate (GstScheduler *sched)
1570 {
1571   GstSchedulerState state = GST_SCHEDULER_STATE_STOPPED;
1572   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1573   gint iterations = osched->iterations;
1574
1575   osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
1576
1577   while (iterations) {
1578     gboolean scheduled = FALSE;
1579     GSList *chains;
1580
1581     /* we have to schedule each of the scheduler chains now */
1582     chains = osched->chains;
1583     while (chains) {
1584       GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
1585       chains = g_slist_next (chains);
1586
1587       /* if the chain is not disabled, schedule it */
1588       if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
1589         schedule_chain (chain);
1590         scheduled = TRUE;
1591       }
1592
1593       /* don't schedule any more chains when interrupted or in error */
1594       if (osched->state != GST_OPT_SCHEDULER_STATE_RUNNING)
1595         break;
1596     }
1597
1598     /* at this point it's possible that the scheduler state is
1599      * in error, we then return an error */
1600     if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
1601       state = GST_SCHEDULER_STATE_ERROR;
1602       break;
1603     }
1604     else {
1605       /* if chains were scheduled, return our current state */
1606       if (scheduled)
1607         state = GST_SCHEDULER_STATE (sched);
1608       /* if no chains were scheduled, we say we are stopped */
1609       else {
1610         state = GST_SCHEDULER_STATE_STOPPED;
1611         break;
1612       }
1613     }
1614     if (iterations > 0)
1615       iterations--;
1616   }
1617
1618   return state;
1619 }
1620
1621
1622 static void
1623 gst_opt_scheduler_show (GstScheduler *sched)
1624 {
1625   GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
1626   GSList *chains;
1627
1628   g_print ("iterations:    %d\n", osched->iterations);
1629   g_print ("max recursion: %d\n", osched->max_recursion);
1630
1631   chains = osched->chains;
1632   while (chains) {
1633     GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
1634     GSList *groups = chain->groups;
1635     chains = g_slist_next (chains);
1636
1637     g_print ("+- chain %p: %d groups, %d enabled, flags %d\n", chain, chain->num_groups, chain->num_enabled, chain->flags);
1638
1639     while (groups) {
1640       GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
1641       GSList *elements = group->elements;
1642       groups = g_slist_next (groups);
1643
1644       g_print (" +- group %p: %d elements, %d enabled, flags %d, entry %s, %s\n", 
1645                       group, group->num_elements, group->num_enabled, group->flags,
1646                       (group->entry ? GST_ELEMENT_NAME (group->entry): "(none)"),
1647                       (group->type == GST_OPT_SCHEDULER_GROUP_GET ? "get-based" : "loop-based") );
1648
1649       while (elements) {
1650         GstElement *element = (GstElement *) elements->data;
1651         elements = g_slist_next (elements);
1652
1653         g_print ("  +- element %s\n", GST_ELEMENT_NAME (element));
1654       }
1655     }
1656   }
1657 }
1658
1659 static void
1660 gst_opt_scheduler_get_property (GObject *object, guint prop_id,
1661                                 GValue *value, GParamSpec *pspec)
1662 {
1663   GstOptScheduler *osched;
1664   
1665   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
1666
1667   osched = GST_OPT_SCHEDULER_CAST (object);
1668
1669   switch (prop_id) {
1670     case ARG_ITERATIONS:
1671       g_value_set_int (value, osched->iterations);
1672       break;
1673     case ARG_MAX_RECURSION:
1674       g_value_set_int (value, osched->max_recursion);
1675       break;
1676     default:
1677       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1678       break;
1679   }
1680 }
1681
1682 static void
1683 gst_opt_scheduler_set_property (GObject *object, guint prop_id,
1684                                 const GValue *value, GParamSpec *pspec)
1685 {
1686   GstOptScheduler *osched;
1687   
1688   g_return_if_fail (GST_IS_OPT_SCHEDULER (object));
1689
1690   osched = GST_OPT_SCHEDULER_CAST (object);
1691
1692   switch (prop_id) {
1693     case ARG_ITERATIONS:
1694       osched->iterations = g_value_get_int (value);
1695       break;
1696     case ARG_MAX_RECURSION:
1697       osched->max_recursion = g_value_get_int (value);
1698       break;
1699     default:
1700       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1701       break;
1702   }
1703 }
1704