Doc updates.
[platform/upstream/gstreamer.git] / gst / schedulers / gstbasicscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #include <gst/gst.h>
28
29 #include "cothreads_compat.h"
30
31 GST_DEBUG_CATEGORY_STATIC (debug_dataflow);
32 GST_DEBUG_CATEGORY_STATIC (debug_scheduler);
33 #define GST_CAT_DEFAULT debug_scheduler
34
35 typedef struct _GstSchedulerChain GstSchedulerChain;
36
37 #define GST_ELEMENT_THREADSTATE(elem)   (GST_ELEMENT (elem)->sched_private)
38 #define GST_RPAD_BUFPEN(pad)            (GST_REAL_PAD(pad)->sched_private)
39
40 #define GST_ELEMENT_COTHREAD_STOPPING                   GST_ELEMENT_SCHEDULER_PRIVATE1
41 #define GST_ELEMENT_IS_COTHREAD_STOPPING(element)       GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
42
43 typedef struct _GstBasicScheduler GstBasicScheduler;
44 typedef struct _GstBasicSchedulerClass GstBasicSchedulerClass;
45
46 #ifdef _COTHREADS_STANDARD
47 # define _SCHEDULER_NAME "standard"
48 #else
49 # define _SCHEDULER_NAME "basic"
50 #endif
51
52 struct _GstSchedulerChain
53 {
54   GstBasicScheduler *sched;
55
56   GList *disabled;
57
58   GList *elements;
59   gint num_elements;
60
61   GstElement *entry;
62
63   gint cothreaded_elements;
64   gboolean schedule;
65 };
66
67 #define GST_TYPE_BASIC_SCHEDULER \
68   (gst_basic_scheduler_get_type())
69 #define GST_BASIC_SCHEDULER(obj) \
70   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BASIC_SCHEDULER,GstBasicScheduler))
71 #define GST_BASIC_SCHEDULER_CLASS(klass) \
72   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BASIC_SCHEDULER,GstBasicSchedulerClass))
73 #define GST_IS_BASIC_SCHEDULER(obj) \
74   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BASIC_SCHEDULER))
75 #define GST_IS_BASIC_SCHEDULER_CLASS(obj) \
76   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASIC_SCHEDULER))
77
78 #define SCHED(element) GST_BASIC_SCHEDULER (GST_ELEMENT_SCHEDULER (element))
79
80 typedef enum
81 {
82   GST_BASIC_SCHEDULER_STATE_NONE,
83   GST_BASIC_SCHEDULER_STATE_STOPPED,
84   GST_BASIC_SCHEDULER_STATE_ERROR,
85   GST_BASIC_SCHEDULER_STATE_RUNNING
86 }
87 GstBasicSchedulerState;
88
89 typedef enum
90 {
91   /* something important has changed inside the scheduler */
92   GST_BASIC_SCHEDULER_CHANGE = GST_SCHEDULER_FLAG_LAST
93 }
94 GstBasicSchedulerFlags;
95
96 struct _GstBasicScheduler
97 {
98   GstScheduler parent;
99
100   GList *elements;
101   gint num_elements;
102
103   GList *chains;
104   gint num_chains;
105
106   GstBasicSchedulerState state;
107
108   cothread_context *context;
109   GstElement *current;
110 };
111
112 struct _GstBasicSchedulerClass
113 {
114   GstSchedulerClass parent_class;
115 };
116
117 static GType _gst_basic_scheduler_type = 0;
118
119 static void gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass);
120 static void gst_basic_scheduler_init (GstBasicScheduler * scheduler);
121
122 static void gst_basic_scheduler_dispose (GObject * object);
123
124 static void gst_basic_scheduler_setup (GstScheduler * sched);
125 static void gst_basic_scheduler_reset (GstScheduler * sched);
126 static void gst_basic_scheduler_add_element (GstScheduler * sched,
127     GstElement * element);
128 static void gst_basic_scheduler_remove_element (GstScheduler * sched,
129     GstElement * element);
130 static GstElementStateReturn gst_basic_scheduler_state_transition (GstScheduler
131     * sched, GstElement * element, gint transition);
132 static gboolean gst_basic_scheduler_yield (GstScheduler * sched,
133     GstElement * element);
134 static gboolean gst_basic_scheduler_interrupt (GstScheduler * sched,
135     GstElement * element);
136 static void gst_basic_scheduler_error (GstScheduler * sched,
137     GstElement * element);
138 static void gst_basic_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
139     GstPad * sinkpad);
140 static void gst_basic_scheduler_pad_unlink (GstScheduler * sched,
141     GstPad * srcpad, GstPad * sinkpad);
142 static GstData *gst_basic_scheduler_pad_select (GstScheduler * sched,
143     GstPad ** selected, GstPad ** padlist);
144 static GstSchedulerState gst_basic_scheduler_iterate (GstScheduler * sched);
145
146 static void gst_basic_scheduler_show (GstScheduler * sched);
147
148 static GstSchedulerClass *parent_class = NULL;
149
150 /* for threaded bins, these pre- and post-run functions lock and unlock the
151  * elements. we have to avoid deadlocks, so we make these convenience macros
152  * that will avoid using do_cothread_switch from within the scheduler. */
153
154 #define do_element_switch(element) G_STMT_START{                \
155   SCHED (element)->current = element;                           \
156   do_cothread_switch (GST_ELEMENT_THREADSTATE (element));       \
157 }G_STMT_END
158
159 #define do_switch_to_main(sched) G_STMT_START{                  \
160   ((GstBasicScheduler*) sched)->current = NULL;                         \
161   do_cothread_switch                                            \
162     (do_cothread_get_main                                       \
163        (((GstBasicScheduler*)sched)->context));                 \
164 }G_STMT_END
165
166 #define do_switch_from_main(entry) G_STMT_START{                \
167   SCHED (entry)->current = entry;                               \
168   do_cothread_switch (GST_ELEMENT_THREADSTATE (entry));         \
169 }G_STMT_END
170
171 static GType
172 gst_basic_scheduler_get_type (void)
173 {
174   if (!_gst_basic_scheduler_type) {
175     static const GTypeInfo scheduler_info = {
176       sizeof (GstBasicSchedulerClass),
177       NULL,
178       NULL,
179       (GClassInitFunc) gst_basic_scheduler_class_init,
180       NULL,
181       NULL,
182       sizeof (GstBasicScheduler),
183       0,
184       (GInstanceInitFunc) gst_basic_scheduler_init,
185       NULL
186     };
187
188     _gst_basic_scheduler_type =
189         g_type_register_static (GST_TYPE_SCHEDULER,
190         "Gst" COTHREADS_NAME_CAPITAL "Scheduler", &scheduler_info, 0);
191   }
192   return _gst_basic_scheduler_type;
193 }
194
195 static void
196 gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass)
197 {
198   GObjectClass *gobject_class;
199   GstObjectClass *gstobject_class;
200   GstSchedulerClass *gstscheduler_class;
201
202   gobject_class = (GObjectClass *) klass;
203   gstobject_class = (GstObjectClass *) klass;
204   gstscheduler_class = (GstSchedulerClass *) klass;
205
206   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
207
208   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_basic_scheduler_dispose);
209
210   gstscheduler_class->setup = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup);
211   gstscheduler_class->reset = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset);
212   gstscheduler_class->add_element =
213       GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element);
214   gstscheduler_class->remove_element =
215       GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element);
216   gstscheduler_class->state_transition =
217       GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition);
218   gstscheduler_class->yield = GST_DEBUG_FUNCPTR (gst_basic_scheduler_yield);
219   gstscheduler_class->interrupt =
220       GST_DEBUG_FUNCPTR (gst_basic_scheduler_interrupt);
221   gstscheduler_class->error = GST_DEBUG_FUNCPTR (gst_basic_scheduler_error);
222   gstscheduler_class->pad_link =
223       GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_link);
224   gstscheduler_class->pad_unlink =
225       GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_unlink);
226   gstscheduler_class->pad_select =
227       GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select);
228   gstscheduler_class->clock_wait = NULL;
229   gstscheduler_class->iterate = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate);
230
231   gstscheduler_class->show = GST_DEBUG_FUNCPTR (gst_basic_scheduler_show);
232
233   do_cothreads_init (NULL);
234 }
235
236 static void
237 gst_basic_scheduler_init (GstBasicScheduler * scheduler)
238 {
239   scheduler->elements = NULL;
240   scheduler->num_elements = 0;
241   scheduler->chains = NULL;
242   scheduler->num_chains = 0;
243
244   GST_FLAG_SET (scheduler, GST_SCHEDULER_FLAG_NEW_API);
245 }
246
247 static void
248 gst_basic_scheduler_dispose (GObject * object)
249 {
250   G_OBJECT_CLASS (parent_class)->dispose (object);
251 }
252
253 static gboolean
254 plugin_init (GstPlugin * plugin)
255 {
256   if (!gst_scheduler_register (plugin, "basic" COTHREADS_NAME,
257           "A basic scheduler using " COTHREADS_NAME " cothreads",
258           gst_basic_scheduler_get_type ()))
259     return FALSE;
260
261   GST_DEBUG_CATEGORY_INIT (debug_dataflow, "basic_dataflow", 0,
262       "basic scheduler dataflow");
263   GST_DEBUG_CATEGORY_INIT (debug_scheduler, "basic_scheduler", 0,
264       "basic scheduler general information");
265
266   return TRUE;
267 }
268
269 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
270     GST_VERSION_MINOR,
271     "gstbasic" COTHREADS_NAME "scheduler",
272     "a basic scheduler using " COTHREADS_NAME " cothreads",
273     plugin_init, VERSION, GST_LICENSE, GST_PACKAGE, GST_ORIGIN)
274
275      static int gst_basic_scheduler_loopfunc_wrapper (int argc, char **argv)
276 {
277   GstElement *element = GST_ELEMENT (argv);
278   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
279
280   GST_DEBUG ("entering loopfunc wrapper of %s", name);
281
282   gst_object_ref (GST_OBJECT (element));
283   do {
284     GST_CAT_DEBUG (debug_dataflow, "calling loopfunc %s for element %s",
285         GST_DEBUG_FUNCPTR_NAME (element->loopfunc), name);
286     (element->loopfunc) (element);
287     GST_CAT_DEBUG (debug_dataflow, "element %s ended loop function", name);
288
289   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
290   GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
291
292   /* due to oddities in the cothreads code, when this function returns it will
293    * switch to the main cothread. thus, we need to unlock the current element. */
294   if (SCHED (element)) {
295     SCHED (element)->current = NULL;
296   }
297
298   GST_DEBUG ("leaving loopfunc wrapper of %s", name);
299   gst_object_unref (GST_OBJECT (element));
300
301   return 0;
302 }
303
304 static int
305 gst_basic_scheduler_chain_wrapper (int argc, char **argv)
306 {
307   GSList *already_iterated = NULL;
308   GstElement *element = GST_ELEMENT (argv);
309   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
310
311   GST_DEBUG ("entered chain wrapper of element %s", name);
312
313   GST_CAT_DEBUG (debug_dataflow, "stepping through pads");
314
315   gst_object_ref (GST_OBJECT (element));
316   do {
317     GList *pads;
318
319     do {
320       pads = element->pads;
321
322       while (pads) {
323         GstPad *pad = GST_PAD (pads->data);
324         GstRealPad *realpad;
325
326         if (!GST_IS_REAL_PAD (pad))
327           continue;
328
329         realpad = GST_REAL_PAD (pad);
330
331         if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK &&
332             GST_PAD_IS_LINKED (realpad) &&
333             g_slist_find (already_iterated, pad) == NULL) {
334           GstData *data;
335
336           GST_CAT_DEBUG (debug_dataflow, "pulling data from %s:%s", name,
337               GST_PAD_NAME (pad));
338           data = gst_pad_pull (pad);
339           if (data) {
340             if (GST_IS_EVENT (data) && !GST_ELEMENT_IS_EVENT_AWARE (element)) {
341               gst_pad_send_event (pad, GST_EVENT (data));
342             } else {
343               GST_CAT_DEBUG (debug_dataflow,
344                   "calling chain function of %s:%s %p", name,
345                   GST_PAD_NAME (pad), data);
346               gst_pad_call_chain_function (GST_PAD (realpad), data);
347               GST_CAT_DEBUG (debug_dataflow,
348                   "calling chain function of element %s done", name);
349             }
350           }
351           already_iterated = g_slist_prepend (already_iterated, pad);
352           break;
353         }
354         pads = g_list_next (pads);
355       }
356     } while (pads != NULL);
357     if (already_iterated == NULL) {
358       GST_DEBUG_OBJECT (SCHED (element), "nothing to iterate for element %s",
359           GST_ELEMENT_NAME (element));
360       break;
361     }
362     g_slist_free (already_iterated);
363     already_iterated = NULL;
364   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
365
366   GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
367
368   /* due to oddities in the cothreads code, when this function returns it will
369    * switch to the main cothread. thus, we need to unlock the current element. */
370   if (SCHED (element)) {
371     SCHED (element)->current = NULL;
372   }
373
374   GST_CAT_DEBUG (debug_dataflow, "leaving chain wrapper of element %s", name);
375   gst_object_unref (GST_OBJECT (element));
376
377   return 0;
378 }
379
380 static int
381 gst_basic_scheduler_src_wrapper (int argc, char **argv)
382 {
383   GstElement *element = GST_ELEMENT (argv);
384   GList *pads;
385   GstRealPad *realpad;
386   GstData *data = NULL;
387   gboolean inf_loop;
388   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
389
390   GST_DEBUG ("entering src wrapper of element %s", name);
391
392   do {
393     inf_loop = TRUE;
394     pads = element->pads;
395     while (pads) {
396
397       if (!GST_IS_REAL_PAD (pads->data))
398         continue;
399
400       realpad = GST_REAL_PAD (pads->data);
401
402       pads = g_list_next (pads);
403       if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SRC) {
404         inf_loop = FALSE;
405         GST_CAT_DEBUG (debug_dataflow, "calling _getfunc for %s:%s",
406             GST_DEBUG_PAD_NAME (realpad));
407         data = gst_pad_call_get_function (GST_PAD (realpad));
408         if (data) {
409           GST_CAT_DEBUG (debug_dataflow, "calling gst_pad_push on pad %s:%s %p",
410               GST_DEBUG_PAD_NAME (realpad), data);
411           gst_pad_push (GST_PAD (realpad), data);
412         }
413       }
414     }
415   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element) && !inf_loop);
416
417   GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
418
419   /* due to oddities in the cothreads code, when this function returns it will
420    * switch to the main cothread. thus, we need to unlock the current element. */
421   SCHED (element)->current = NULL;
422
423   GST_DEBUG ("leaving src wrapper of element %s", name);
424
425   return 0;
426 }
427
428 static void
429 gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstData * data)
430 {
431   gint loop_count = 100;
432   GstElement *parent;
433   GstRealPad *peer;
434
435   parent = GST_PAD_PARENT (pad);
436   peer = GST_RPAD_PEER (pad);
437
438   GST_CAT_DEBUG (debug_dataflow, "putting buffer %p in peer \"%s:%s\"'s pen",
439       data, GST_DEBUG_PAD_NAME (peer));
440
441   /* 
442    * loop until the bufferpen is empty so we can fill it up again
443    */
444   while (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) != NULL && --loop_count) {
445     GST_CAT_DEBUG (debug_dataflow, "switching to %p to empty bufpen %d",
446         GST_ELEMENT_THREADSTATE (parent), loop_count);
447
448     do_element_switch (parent);
449
450     /* we may no longer be the same pad, check. */
451     if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
452       GST_CAT_DEBUG (debug_dataflow, "new pad in mid-switch!");
453       pad = (GstPad *) GST_RPAD_PEER (peer);
454     }
455     parent = GST_PAD_PARENT (pad);
456     peer = GST_RPAD_PEER (pad);
457   }
458
459   if (loop_count == 0) {
460     GST_ELEMENT_ERROR (parent, CORE, SCHEDULER, (NULL),
461         ("(internal error) basic: maximum number of switches exceeded"));
462     return;
463   }
464
465   g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL);
466
467   /* now fill the bufferpen and switch so it can be consumed */
468   GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = data;
469   GST_CAT_DEBUG (debug_dataflow, "switching to %p to consume buffer %p",
470       GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)), data);
471
472   do_element_switch (parent);
473
474   GST_CAT_DEBUG (debug_dataflow, "leaving chainhandler proxy of %s:%s",
475       GST_DEBUG_PAD_NAME (pad));
476 }
477
478 static void
479 gst_basic_scheduler_select_proxy (GstPad * pad, GstData * data)
480 {
481   GstElement *parent;
482
483   parent = GST_PAD_PARENT (pad);
484
485   GST_CAT_DEBUG (debug_dataflow, "putting buffer %p in peer's pen of pad %s:%s",
486       data, GST_DEBUG_PAD_NAME (pad));
487
488   g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL);
489   /* now fill the bufferpen and switch so it can be consumed */
490   GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = data;
491   GST_CAT_DEBUG (debug_dataflow, "switching to %p",
492       GST_ELEMENT_THREADSTATE (parent));
493   /* FIXME temporarily diabled */
494   /* parent->select_pad = pad; */
495
496   do_element_switch (parent);
497
498   GST_CAT_DEBUG (debug_dataflow, "done switching");
499 }
500
501
502 static GstData *
503 gst_basic_scheduler_gethandler_proxy (GstPad * pad)
504 {
505   GstData *data;
506   GstElement *parent;
507   GstRealPad *peer;
508
509   GST_CAT_DEBUG (debug_dataflow, "entering gethandler proxy of %s:%s",
510       GST_DEBUG_PAD_NAME (pad));
511
512   parent = GST_PAD_PARENT (pad);
513   peer = GST_RPAD_PEER (pad);
514
515   /* FIXME this should be bounded */
516   /* we will loop switching to the peer until it's filled up the bufferpen */
517   while (GST_RPAD_BUFPEN (pad) == NULL) {
518
519     GST_CAT_DEBUG (debug_dataflow, "switching to \"%s\": %p to fill bufpen",
520         GST_ELEMENT_NAME (parent), GST_ELEMENT_THREADSTATE (parent));
521
522     do_element_switch (parent);
523
524     /* we may no longer be the same pad, check. */
525     if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
526       GST_CAT_DEBUG (debug_dataflow, "new pad in mid-switch!");
527       pad = (GstPad *) GST_RPAD_PEER (peer);
528       if (!pad) {
529         GST_ELEMENT_ERROR (parent, CORE, PAD, (NULL), ("pad unlinked"));
530       }
531       parent = GST_PAD_PARENT (pad);
532       peer = GST_RPAD_PEER (pad);
533     }
534   }
535   GST_CAT_DEBUG (debug_dataflow, "done switching");
536
537   /* now grab the buffer from the pen, clear the pen, and return the buffer */
538   data = GST_RPAD_BUFPEN (pad);
539   GST_RPAD_BUFPEN (pad) = NULL;
540
541   GST_CAT_DEBUG (debug_dataflow, "leaving gethandler proxy of %s:%s",
542       GST_DEBUG_PAD_NAME (pad));
543
544   return data;
545 }
546
547 static gboolean
548 gst_basic_scheduler_eventhandler_proxy (GstPad * srcpad, GstEvent * event)
549 {
550   gboolean flush;
551
552   GST_CAT_INFO (debug_dataflow, "intercepting event %d on pad %s:%s",
553       GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (srcpad));
554
555   /* figure out if we need to flush */
556   switch (GST_EVENT_TYPE (event)) {
557     case GST_EVENT_FLUSH:
558       flush = TRUE;
559       break;
560     case GST_EVENT_SEEK:
561     case GST_EVENT_SEEK_SEGMENT:
562       flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
563       break;
564     default:
565       flush = FALSE;
566       break;
567   }
568
569   if (flush) {
570     GstData *data = GST_RPAD_BUFPEN (srcpad);
571
572     GST_CAT_INFO (debug_dataflow, "event is flush");
573
574     if (data) {
575       GST_CAT_INFO (debug_dataflow, "need to clear some buffers");
576
577       gst_data_unref (data);
578       GST_RPAD_BUFPEN (srcpad) = NULL;
579     }
580   }
581   return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
582 }
583
584 static gboolean
585 gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
586 {
587   GList *elements;
588   GstElement *element;
589   cothread_func wrapper_function;
590   const GList *pads;
591   GstPad *pad;
592
593   GST_DEBUG ("chain is using COTHREADS");
594
595   g_assert (chain->sched->context != NULL);
596
597   /* walk through all the chain's elements */
598   elements = chain->elements;
599   while (elements) {
600     gboolean decoupled;
601
602     element = GST_ELEMENT (elements->data);
603     elements = g_list_next (elements);
604
605     decoupled = GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED);
606
607     /* start out without a wrapper function, we select it later */
608     wrapper_function = NULL;
609
610     /* if the element has a loopfunc... */
611     if (element->loopfunc != NULL) {
612       wrapper_function =
613           GST_DEBUG_FUNCPTR (gst_basic_scheduler_loopfunc_wrapper);
614       GST_DEBUG ("element '%s' is a loop-based", GST_ELEMENT_NAME (element));
615     } else {
616       /* otherwise we need to decide what kind of cothread
617        * if it's not DECOUPLED, we decide based on 
618        * whether it's a source or not */
619       if (!decoupled) {
620         /* if it doesn't have any sinks, it must be a source (duh) */
621         if (element->numsinkpads == 0) {
622           wrapper_function =
623               GST_DEBUG_FUNCPTR (gst_basic_scheduler_src_wrapper);
624           GST_DEBUG ("element '%s' is a source, using _src_wrapper",
625               GST_ELEMENT_NAME (element));
626         } else {
627           wrapper_function =
628               GST_DEBUG_FUNCPTR (gst_basic_scheduler_chain_wrapper);
629           GST_DEBUG ("element '%s' is a filter, using _chain_wrapper",
630               GST_ELEMENT_NAME (element));
631         }
632       }
633     }
634
635     /* now we have to walk through the pads to set up their state */
636     pads = element->pads;
637     while (pads) {
638       GstPad *peerpad;
639
640       pad = GST_PAD (pads->data);
641       pads = g_list_next (pads);
642
643       if (!GST_IS_REAL_PAD (pad))
644         continue;
645
646       peerpad = GST_PAD_PEER (pad);
647       if (peerpad) {
648         GstElement *peerelement = GST_ELEMENT (GST_PAD_PARENT (peerpad));
649         gboolean different_sched =
650             (peerelement->scheduler != GST_SCHEDULER (chain->sched));
651         gboolean peer_decoupled =
652             GST_FLAG_IS_SET (peerelement, GST_ELEMENT_DECOUPLED);
653
654         GST_DEBUG ("inspecting pad %s:%s", GST_DEBUG_PAD_NAME (peerpad));
655
656         /* we don't need to check this for decoupled elements */
657         if (!decoupled) {
658           /* if the peer element is in another schedule, 
659            * it's not decoupled and we are not decoupled
660            * either, we have an error */
661           if (different_sched && !peer_decoupled) {
662             GST_ELEMENT_ERROR (element, CORE, SCHEDULER, (NULL),
663                 ("element \"%s\" is not decoupled but has pads in different schedulers",
664                     GST_ELEMENT_NAME (element)));
665             return FALSE;
666           }
667           /* ok, the peer is in a different scheduler and is decoupled, 
668            * we need to set the
669            * handlers so we can talk with it */
670           else if (different_sched) {
671             if (GST_RPAD_DIRECTION (peerpad) == GST_PAD_SINK) {
672               GST_DEBUG ("copying chain func into push proxy for peer %s:%s",
673                   GST_DEBUG_PAD_NAME (peerpad));
674               GST_RPAD_CHAINHANDLER (peerpad) = gst_pad_call_chain_function;
675             } else {
676               GST_DEBUG ("copying get func into pull proxy for peer %s:%s",
677                   GST_DEBUG_PAD_NAME (peerpad));
678               GST_RPAD_GETHANDLER (peerpad) = gst_pad_call_get_function;
679             }
680           }
681         }
682         /* in any case we need to copy the eventfunc into the handler */
683         GST_RPAD_EVENTHANDLER (peerpad) = GST_RPAD_EVENTFUNC (peerpad);
684       }
685
686       /* if the element is DECOUPLED or outside the manager, we have to chain */
687       if (decoupled) {
688         /* set the chain proxies */
689         if (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK) {
690           GST_DEBUG ("copying chain function into push proxy for %s:%s",
691               GST_DEBUG_PAD_NAME (pad));
692           GST_RPAD_CHAINHANDLER (pad) = gst_pad_call_chain_function;
693         } else {
694           GST_DEBUG ("copying get function into pull proxy for %s:%s",
695               GST_DEBUG_PAD_NAME (pad));
696           GST_RPAD_GETHANDLER (pad) = gst_pad_call_get_function;
697         }
698       }
699       /* otherwise we really are a cothread */
700       else {
701         if (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK) {
702           GST_DEBUG ("setting cothreaded push proxy for sinkpad %s:%s",
703               GST_DEBUG_PAD_NAME (pad));
704           GST_RPAD_CHAINHANDLER (pad) =
705               GST_DEBUG_FUNCPTR (gst_basic_scheduler_chainhandler_proxy);
706           GST_RPAD_EVENTHANDLER (pad) = GST_RPAD_EVENTFUNC (pad);
707         } else {
708           GST_DEBUG ("setting cothreaded pull proxy for srcpad %s:%s",
709               GST_DEBUG_PAD_NAME (pad));
710           GST_RPAD_GETHANDLER (pad) =
711               GST_DEBUG_FUNCPTR (gst_basic_scheduler_gethandler_proxy);
712           /* the gethandler proxy function can queue a buffer in the bufpen, we need
713            * to remove this buffer when a flush event is sent on the pad */
714           GST_RPAD_EVENTHANDLER (pad) =
715               GST_DEBUG_FUNCPTR (gst_basic_scheduler_eventhandler_proxy);
716         }
717       }
718     }
719
720     /* need to set up the cothread now */
721     if (wrapper_function != NULL) {
722       if (GST_ELEMENT_THREADSTATE (element) == NULL) {
723         GST_DEBUG ("about to create a cothread, wrapper for '%s' is &%s",
724             GST_ELEMENT_NAME (element),
725             GST_DEBUG_FUNCPTR_NAME (wrapper_function));
726         do_cothread_create (GST_ELEMENT_THREADSTATE (element),
727             chain->sched->context, wrapper_function, 0, (char **) element);
728         if (GST_ELEMENT_THREADSTATE (element) == NULL) {
729           GST_ELEMENT_ERROR (element, RESOURCE, TOO_LAZY, (NULL),
730               ("could not create cothread for \"%s\"",
731                   GST_ELEMENT_NAME (element)));
732           return FALSE;
733         }
734         GST_DEBUG ("created cothread %p for '%s'",
735             GST_ELEMENT_THREADSTATE (element), GST_ELEMENT_NAME (element));
736       } else {
737         /* set the cothread wrapper function */
738         GST_DEBUG ("about to set the wrapper function for '%s' to &%s",
739             GST_ELEMENT_NAME (element),
740             GST_DEBUG_FUNCPTR_NAME (wrapper_function));
741         do_cothread_setfunc (GST_ELEMENT_THREADSTATE (element),
742             chain->sched->context, wrapper_function, 0, (char **) element);
743         GST_DEBUG ("set wrapper function for '%s' to &%s",
744             GST_ELEMENT_NAME (element),
745             GST_DEBUG_FUNCPTR_NAME (wrapper_function));
746       }
747     }
748   }
749
750   return TRUE;
751 }
752
753 static GstSchedulerChain *
754 gst_basic_scheduler_chain_new (GstBasicScheduler * sched)
755 {
756   GstSchedulerChain *chain = g_new (GstSchedulerChain, 1);
757
758   /* initialize the chain with sane values */
759   chain->sched = sched;
760   chain->disabled = NULL;
761   chain->elements = NULL;
762   chain->num_elements = 0;
763   chain->entry = NULL;
764   chain->cothreaded_elements = 0;
765   chain->schedule = FALSE;
766
767   /* add the chain to the schedulers' list of chains */
768   sched->chains = g_list_prepend (sched->chains, chain);
769   sched->num_chains++;
770
771   /* notify the scheduler that something changed */
772   GST_FLAG_SET (sched, GST_BASIC_SCHEDULER_CHANGE);
773
774   GST_INFO ("created new chain %p, now are %d chains in sched %p",
775       chain, sched->num_chains, sched);
776
777   return chain;
778 }
779
780 static void
781 gst_basic_scheduler_chain_destroy (GstSchedulerChain * chain)
782 {
783   GstBasicScheduler *sched = chain->sched;
784
785   /* remove the chain from the schedulers' list of chains */
786   sched->chains = g_list_remove (sched->chains, chain);
787   sched->num_chains--;
788
789   /* destroy the chain */
790   g_list_free (chain->disabled);        /* should be empty... */
791   g_list_free (chain->elements);        /* ditto              */
792
793   GST_INFO ("destroyed chain %p, now are %d chains in sched %p", chain,
794       sched->num_chains, sched);
795
796   g_free (chain);
797
798   /* notify the scheduler that something changed */
799   GST_FLAG_SET (sched, GST_BASIC_SCHEDULER_CHANGE);
800 }
801
802 static void
803 gst_basic_scheduler_chain_add_element (GstSchedulerChain * chain,
804     GstElement * element)
805 {
806   /* set the sched pointer for the element */
807   element->scheduler = GST_SCHEDULER (chain->sched);
808
809   /* add the element to either the main list or the disabled list */
810   if (GST_STATE (element) == GST_STATE_PLAYING) {
811     GST_INFO ("adding element \"%s\" to chain %p enabled",
812         GST_ELEMENT_NAME (element), chain);
813     chain->elements = g_list_prepend (chain->elements, element);
814   } else {
815     GST_INFO ("adding element \"%s\" to chain %p disabled",
816         GST_ELEMENT_NAME (element), chain);
817     chain->disabled = g_list_prepend (chain->disabled, element);
818   }
819   chain->num_elements++;
820
821   /* notify the scheduler that something changed */
822   GST_FLAG_SET (chain->sched, GST_BASIC_SCHEDULER_CHANGE);
823 }
824
825 static gboolean
826 gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain,
827     GstElement * element)
828 {
829   GST_INFO ("enabling element \"%s\" in chain %p",
830       GST_ELEMENT_NAME (element), chain);
831
832   /* remove from disabled list */
833   chain->disabled = g_list_remove (chain->disabled, element);
834
835   /* add to elements list */
836   chain->elements = g_list_prepend (chain->elements, element);
837
838   /* notify the scheduler that something changed */
839   GST_FLAG_SET (chain->sched, GST_BASIC_SCHEDULER_CHANGE);
840   /* GST_FLAG_UNSET(element, GST_ELEMENT_COTHREAD_STOPPING); */
841
842   /* reschedule the chain */
843   return gst_basic_scheduler_cothreaded_chain (GST_BIN (GST_SCHEDULER (chain->
844               sched)->parent), chain);
845 }
846
847 static void
848 gst_basic_scheduler_chain_disable_element (GstSchedulerChain * chain,
849     GstElement * element)
850 {
851   GST_INFO ("disabling element \"%s\" in chain %p",
852       GST_ELEMENT_NAME (element), chain);
853
854   /* remove from elements list */
855   chain->elements = g_list_remove (chain->elements, element);
856
857   /* add to disabled list */
858   chain->disabled = g_list_prepend (chain->disabled, element);
859
860   /* notify the scheduler that something changed */
861   GST_FLAG_SET (chain->sched, GST_BASIC_SCHEDULER_CHANGE);
862   GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
863
864   /* reschedule the chain */
865 /* FIXME this should be done only if manager state != NULL */
866 /*  gst_basic_scheduler_cothreaded_chain(GST_BIN(chain->sched->parent),chain); */
867 }
868
869 static void
870 gst_basic_scheduler_chain_remove_element (GstSchedulerChain * chain,
871     GstElement * element)
872 {
873   GST_INFO ("removing element \"%s\" from chain %p", GST_ELEMENT_NAME (element),
874       chain);
875
876   /* if it's active, deactivate it */
877   if (g_list_find (chain->elements, element)) {
878     gst_basic_scheduler_chain_disable_element (chain, element);
879   }
880   /* we have to check for a threadstate here because a queue doesn't have one */
881   if (GST_ELEMENT_THREADSTATE (element)) {
882     do_cothread_destroy (GST_ELEMENT_THREADSTATE (element));
883     GST_ELEMENT_THREADSTATE (element) = NULL;
884   }
885
886   /* remove the element from the list of elements */
887   chain->disabled = g_list_remove (chain->disabled, element);
888   chain->num_elements--;
889
890   /* notify the scheduler that something changed */
891   GST_FLAG_SET (chain->sched, GST_BASIC_SCHEDULER_CHANGE);
892
893   /* if there are no more elements in the chain, destroy the chain */
894   if (chain->num_elements == 0)
895     gst_basic_scheduler_chain_destroy (chain);
896
897 }
898
899 static void
900 gst_basic_scheduler_chain_elements (GstBasicScheduler * sched,
901     GstElement * element1, GstElement * element2)
902 {
903   GList *chains;
904   GstSchedulerChain *chain;
905   GstSchedulerChain *chain1 = NULL, *chain2 = NULL;
906   GstElement *element;
907
908   /* first find the chains that hold the two  */
909   chains = sched->chains;
910   while (chains) {
911     chain = (GstSchedulerChain *) (chains->data);
912     chains = g_list_next (chains);
913
914     if (g_list_find (chain->disabled, element1))
915       chain1 = chain;
916     else if (g_list_find (chain->elements, element1))
917       chain1 = chain;
918
919     if (g_list_find (chain->disabled, element2))
920       chain2 = chain;
921     else if (g_list_find (chain->elements, element2))
922       chain2 = chain;
923   }
924
925   /* first check to see if they're in the same chain, we're done if that's the case */
926   if ((chain1 != NULL) && (chain1 == chain2)) {
927     GST_INFO ("elements are already in the same chain");
928     return;
929   }
930
931   /* now, if neither element has a chain, create one */
932   if ((chain1 == NULL) && (chain2 == NULL)) {
933     GST_INFO ("creating new chain to hold two new elements");
934     chain = gst_basic_scheduler_chain_new (sched);
935     gst_basic_scheduler_chain_add_element (chain, element1);
936     gst_basic_scheduler_chain_add_element (chain, element2);
937     /* FIXME chain changed here */
938 /*    gst_basic_scheduler_cothreaded_chain(chain->sched->parent,chain); */
939
940     /* otherwise if both have chains already, join them */
941   } else if ((chain1 != NULL) && (chain2 != NULL)) {
942     GST_INFO ("merging chain %p into chain %p", chain2, chain1);
943     /* take the contents of chain2 and merge them into chain1 */
944     chain1->disabled =
945         g_list_concat (chain1->disabled, g_list_copy (chain2->disabled));
946     chain1->elements =
947         g_list_concat (chain1->elements, g_list_copy (chain2->elements));
948     chain1->num_elements += chain2->num_elements;
949     gst_basic_scheduler_chain_destroy (chain2);
950     if (sched->context)
951
952       gst_basic_scheduler_cothreaded_chain (GST_BIN (GST_SCHEDULER (chain1->
953                   sched)->parent), chain1);
954
955     /* otherwise one has a chain already, the other doesn't */
956   } else {
957     /* pick out which one has the chain, and which doesn't */
958     if (chain1 != NULL)
959       chain = chain1, element = element2;
960     else
961       chain = chain2, element = element1;
962
963     GST_INFO ("adding element to existing chain");
964     gst_basic_scheduler_chain_add_element (chain, element);
965     /* FIXME chain changed here */
966 /*    gst_basic_scheduler_cothreaded_chain(chain->sched->parent,chain); */
967   }
968
969 }
970
971
972 /* find the chain within the scheduler that holds the element, if any */
973 static GstSchedulerChain *
974 gst_basic_scheduler_find_chain (GstBasicScheduler * sched, GstElement * element)
975 {
976   GList *chains;
977   GstSchedulerChain *chain;
978
979   GST_INFO ("searching for element \"%s\" in chains",
980       GST_ELEMENT_NAME (element));
981
982   chains = sched->chains;
983   while (chains) {
984     chain = (GstSchedulerChain *) (chains->data);
985     chains = g_list_next (chains);
986
987     if (g_list_find (chain->elements, element))
988       return chain;
989     if (g_list_find (chain->disabled, element))
990       return chain;
991   }
992
993   return NULL;
994 }
995
996 static void
997 gst_basic_scheduler_chain_recursive_add (GstSchedulerChain * chain,
998     GstElement * element, gboolean remove)
999 {
1000   GList *pads;
1001   GstPad *pad;
1002   GstElement *peerelement;
1003   GstSchedulerChain *prevchain;
1004
1005   /* check to see if it's in a chain already */
1006   prevchain = gst_basic_scheduler_find_chain (chain->sched, element);
1007   /* if it's already in another chain, either remove or punt */
1008   if (prevchain != NULL) {
1009     if (remove == TRUE)
1010       gst_basic_scheduler_chain_remove_element (prevchain, element);
1011     else
1012       return;
1013   }
1014
1015   /* add it to this one */
1016   gst_basic_scheduler_chain_add_element (chain, element);
1017
1018   GST_DEBUG ("recursing on element \"%s\"", GST_ELEMENT_NAME (element));
1019   /* now go through all the pads and see which peers can be added */
1020   pads = element->pads;
1021   while (pads) {
1022     pad = GST_PAD (pads->data);
1023     pads = g_list_next (pads);
1024
1025     GST_DEBUG ("have pad %s:%s, checking for valid peer",
1026         GST_DEBUG_PAD_NAME (pad));
1027     /* if the peer exists and could be in the same chain */
1028     if (GST_PAD_PEER (pad)) {
1029       GST_DEBUG ("has peer %s:%s", GST_DEBUG_PAD_NAME (GST_PAD_PEER (pad)));
1030       peerelement = GST_PAD_PARENT (GST_PAD_PEER (pad));
1031       if (GST_ELEMENT_SCHEDULER (GST_PAD_PARENT (pad)) ==
1032           GST_ELEMENT_SCHEDULER (peerelement)) {
1033         GST_DEBUG ("peer \"%s\" is valid for same chain",
1034             GST_ELEMENT_NAME (peerelement));
1035         gst_basic_scheduler_chain_recursive_add (chain, peerelement, remove);
1036       }
1037     }
1038   }
1039 }
1040
1041 /*
1042  * Entry points for this scheduler.
1043  */
1044 static void
1045 gst_basic_scheduler_setup (GstScheduler * sched)
1046 {
1047   /* first create thread context */
1048   if (GST_BASIC_SCHEDULER (sched)->context == NULL) {
1049     GST_DEBUG ("initializing cothread context");
1050     GST_BASIC_SCHEDULER (sched)->context = do_cothread_context_init ();
1051   }
1052 }
1053
1054 static void
1055 gst_basic_scheduler_reset (GstScheduler * sched)
1056 {
1057   cothread_context *ctx;
1058   GList *elements = GST_BASIC_SCHEDULER (sched)->elements;
1059
1060   while (elements) {
1061     GstElement *element = GST_ELEMENT (elements->data);
1062
1063     if (GST_ELEMENT_THREADSTATE (element)) {
1064       do_cothread_destroy (GST_ELEMENT_THREADSTATE (element));
1065       GST_ELEMENT_THREADSTATE (element) = NULL;
1066     }
1067     elements = g_list_next (elements);
1068   }
1069
1070   ctx = GST_BASIC_SCHEDULER (sched)->context;
1071
1072   do_cothread_context_destroy (ctx);
1073
1074   GST_BASIC_SCHEDULER (sched)->context = NULL;
1075 }
1076
1077 static void
1078 gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
1079 {
1080   GstSchedulerChain *chain;
1081   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1082
1083   GST_INFO ("adding element \"%s\" to scheduler", GST_ELEMENT_NAME (element));
1084
1085   /* only deal with elements after this point, not bins */
1086   /* exception is made for Bin's that are schedulable, like the autoplugger */
1087   if (GST_IS_BIN (element)
1088       && !GST_FLAG_IS_SET (element, GST_BIN_SELF_SCHEDULABLE))
1089     return;
1090
1091   /* first add it to the list of elements that are to be scheduled */
1092   bsched->elements = g_list_prepend (bsched->elements, element);
1093   bsched->num_elements++;
1094
1095   /* create a chain to hold it, and add */
1096   chain = gst_basic_scheduler_chain_new (bsched);
1097   gst_basic_scheduler_chain_add_element (chain, element);
1098 }
1099
1100 static void
1101 gst_basic_scheduler_remove_element (GstScheduler * sched, GstElement * element)
1102 {
1103   GstSchedulerChain *chain;
1104   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1105
1106   if (g_list_find (bsched->elements, element)) {
1107     GST_INFO ("removing element \"%s\" from scheduler",
1108         GST_ELEMENT_NAME (element));
1109
1110     /* if we are removing the currently scheduled element */
1111     if (bsched->current == element) {
1112       GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
1113       bsched->current = NULL;
1114     }
1115     /* find what chain the element is in */
1116     chain = gst_basic_scheduler_find_chain (bsched, element);
1117
1118     /* remove it from its chain */
1119     if (chain != NULL) {
1120       gst_basic_scheduler_chain_remove_element (chain, element);
1121     }
1122
1123     /* remove it from the list of elements */
1124     bsched->elements = g_list_remove (bsched->elements, element);
1125     bsched->num_elements--;
1126
1127     /* unset the scheduler pointer in the element */
1128   }
1129 }
1130
1131 static GstElementStateReturn
1132 gst_basic_scheduler_state_transition (GstScheduler * sched,
1133     GstElement * element, gint transition)
1134 {
1135   GstSchedulerChain *chain;
1136   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1137
1138   /* check if our parent changed state */
1139   if (GST_SCHEDULER_PARENT (sched) == element) {
1140     GST_INFO ("parent \"%s\" changed state", GST_ELEMENT_NAME (element));
1141     if (transition == GST_STATE_PLAYING_TO_PAUSED) {
1142       GST_INFO ("setting scheduler state to stopped");
1143       GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED;
1144     } else if (transition == GST_STATE_PAUSED_TO_PLAYING) {
1145       GST_INFO ("setting scheduler state to running");
1146       GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING;
1147     } else {
1148       GST_INFO ("no interesting state change, doing nothing");
1149     }
1150   } else if (transition == GST_STATE_PLAYING_TO_PAUSED ||
1151       transition == GST_STATE_PAUSED_TO_PLAYING) {
1152     /* find the chain the element is in */
1153     chain = gst_basic_scheduler_find_chain (bsched, element);
1154
1155     /* remove it from the chain */
1156     if (chain) {
1157       if (transition == GST_STATE_PLAYING_TO_PAUSED) {
1158         gst_basic_scheduler_chain_disable_element (chain, element);
1159       } else if (transition == GST_STATE_PAUSED_TO_PLAYING) {
1160         if (!gst_basic_scheduler_chain_enable_element (chain, element)) {
1161           GST_INFO ("could not enable element \"%s\"",
1162               GST_ELEMENT_NAME (element));
1163           return GST_STATE_FAILURE;
1164         }
1165       }
1166     } else {
1167       GST_INFO ("element \"%s\" not found in any chain, no state change",
1168           GST_ELEMENT_NAME (element));
1169     }
1170   }
1171
1172   return GST_STATE_SUCCESS;
1173 }
1174
1175 static gboolean
1176 gst_basic_scheduler_yield (GstScheduler * sched, GstElement * element)
1177 {
1178   if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) {
1179
1180     do_switch_to_main (sched);
1181
1182     /* no need to do a pre_run, the cothread is stopping */
1183   }
1184   return FALSE;
1185 }
1186
1187 static gboolean
1188 gst_basic_scheduler_interrupt (GstScheduler * sched, GstElement * element)
1189 {
1190
1191   GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
1192   do_switch_to_main (sched);
1193
1194   return FALSE;
1195 }
1196
1197 static void
1198 gst_basic_scheduler_error (GstScheduler * sched, GstElement * element)
1199 {
1200   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1201
1202   if (GST_ELEMENT_THREADSTATE (element)) {
1203     GstSchedulerChain *chain;
1204
1205     chain = gst_basic_scheduler_find_chain (bsched, element);
1206     if (chain)
1207       gst_basic_scheduler_chain_disable_element (chain, element);
1208
1209     GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_ERROR;
1210
1211     do_switch_to_main (sched);
1212   }
1213 }
1214
1215 static void
1216 gst_basic_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
1217     GstPad * sinkpad)
1218 {
1219   GstElement *srcelement, *sinkelement;
1220   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1221
1222   srcelement = GST_PAD_PARENT (srcpad);
1223   g_return_if_fail (srcelement != NULL);
1224   sinkelement = GST_PAD_PARENT (sinkpad);
1225   g_return_if_fail (sinkelement != NULL);
1226
1227   GST_INFO ("have pad linked callback on %s:%s to %s:%s",
1228       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1229   GST_DEBUG ("srcpad sched is %p, sinkpad sched is %p",
1230       GST_ELEMENT_SCHEDULER (srcelement), GST_ELEMENT_SCHEDULER (sinkelement));
1231
1232   gst_basic_scheduler_chain_elements (bsched, srcelement, sinkelement);
1233 }
1234
1235 static void
1236 gst_basic_scheduler_pad_unlink (GstScheduler * sched, GstPad * srcpad,
1237     GstPad * sinkpad)
1238 {
1239   GstElement *element1, *element2;
1240   GstSchedulerChain *chain1, *chain2;
1241   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1242
1243   GST_INFO ("unlinking pads %s:%s and %s:%s",
1244       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1245
1246   /* we need to have the parent elements of each pad */
1247   element1 = GST_ELEMENT (GST_PAD_PARENT (srcpad));
1248   element2 = GST_ELEMENT (GST_PAD_PARENT (sinkpad));
1249
1250   /* first task is to remove the old chain they belonged to.
1251    * this can be accomplished by taking either of the elements,
1252    * since they are guaranteed to be in the same chain
1253    * FIXME is it potentially better to make an attempt at splitting cleaner??
1254    */
1255   chain1 = gst_basic_scheduler_find_chain (bsched, element1);
1256   chain2 = gst_basic_scheduler_find_chain (bsched, element2);
1257
1258 /* FIXME: The old code still works in most cases, but does not deal with
1259  * the problem of screwed up sched chains in some autoplugging cases.
1260  * The new code has an infinite recursion bug during pipeline shutdown,
1261  * which must be fixed before it can be enabled again.
1262  */
1263 #if 1
1264   if (chain1 != chain2) {
1265     /* elements not in the same chain don't need to be separated */
1266     GST_INFO ("elements not in the same chain");
1267     return;
1268   }
1269
1270   if (chain1) {
1271     GST_INFO ("destroying chain");
1272     gst_basic_scheduler_chain_destroy (chain1);
1273
1274     /* now create a new chain to hold element1 and build it from scratch */
1275     chain1 = gst_basic_scheduler_chain_new (bsched);
1276     gst_basic_scheduler_chain_recursive_add (chain1, element1, FALSE);
1277   }
1278
1279   /* check the other element to see if it landed in the newly created chain */
1280   if (gst_basic_scheduler_find_chain (bsched, element2) == NULL) {
1281     /* if not in chain, create chain and build from scratch */
1282     chain2 = gst_basic_scheduler_chain_new (bsched);
1283     gst_basic_scheduler_chain_recursive_add (chain2, element2, FALSE);
1284   }
1285 #else
1286
1287   /* if they're both in the same chain, move second set of elements to a new chain */
1288   if (chain1 && (chain1 == chain2)) {
1289     GST_INFO ("creating new chain for second element and peers");
1290     chain2 = gst_basic_scheduler_chain_new (bsched);
1291     gst_basic_scheduler_chain_recursive_add (chain2, element2, TRUE);
1292   }
1293 #endif
1294 }
1295
1296 static GstData *
1297 gst_basic_scheduler_pad_select (GstScheduler * sched, GstPad ** selected,
1298     GstPad ** padlist)
1299 {
1300   GstData *data = NULL;
1301   gint i = 0;
1302
1303   GST_INFO ("performing select");
1304
1305   while (padlist[i]) {
1306     GstPad *pad = padlist[i];
1307
1308     GST_RPAD_CHAINHANDLER (pad) =
1309         GST_DEBUG_FUNCPTR (gst_basic_scheduler_select_proxy);
1310   }
1311
1312   do_element_switch (GST_PAD_PARENT (GST_PAD_PEER (padlist[0])));
1313
1314   i = 0;
1315   while (padlist[i]) {
1316     GstPad *pad = padlist[i];
1317
1318     if (GST_RPAD_BUFPEN (pad)) {
1319       *selected = pad;
1320       data = GST_RPAD_BUFPEN (pad);
1321       GST_RPAD_BUFPEN (pad) = NULL;
1322     }
1323
1324     GST_RPAD_CHAINHANDLER (pad) =
1325         GST_DEBUG_FUNCPTR (gst_basic_scheduler_chainhandler_proxy);
1326   }
1327
1328   g_assert (data != NULL);
1329   return data;
1330 }
1331
1332 static GstSchedulerState
1333 gst_basic_scheduler_iterate (GstScheduler * sched)
1334 {
1335   GList *chains;
1336   GstSchedulerChain *chain;
1337   GstElement *entry;
1338   GList *elements;
1339   gint scheduled = 0;
1340   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1341
1342   GST_CAT_LOG_OBJECT (debug_dataflow, sched,
1343       "starting iteration in bin %s", GST_ELEMENT_NAME (sched->parent));
1344
1345   /* clear the changes flag */
1346   GST_FLAG_UNSET (bsched, GST_BASIC_SCHEDULER_CHANGE);
1347
1348   /* step through all the chains */
1349   chains = bsched->chains;
1350
1351   if (chains == NULL)
1352     return GST_SCHEDULER_STATE_STOPPED;
1353
1354   while (chains) {
1355     chain = (GstSchedulerChain *) (chains->data);
1356     chains = g_list_next (chains);
1357
1358     /* all we really have to do is switch to the first child            */
1359     /* FIXME this should be lots more intelligent about where to start  */
1360     GST_CAT_DEBUG (debug_dataflow,
1361         "starting iteration via cothreads using %s scheduler", _SCHEDULER_NAME);
1362
1363     if (chain->elements) {
1364       entry = NULL;             /*MattH ADDED? */
1365       GST_DEBUG ("there are %d elements in this chain", chain->num_elements);
1366       elements = chain->elements;
1367       while (elements) {
1368         entry = GST_ELEMENT (elements->data);
1369         elements = g_list_next (elements);
1370         if (GST_FLAG_IS_SET (entry, GST_ELEMENT_DECOUPLED)) {
1371           GST_DEBUG ("entry \"%s\" is DECOUPLED, skipping",
1372               GST_ELEMENT_NAME (entry));
1373           entry = NULL;
1374         } else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_INFINITE_LOOP)) {
1375           GST_DEBUG ("entry \"%s\" is not valid, skipping",
1376               GST_ELEMENT_NAME (entry));
1377           entry = NULL;
1378         } else
1379           break;
1380       }
1381       if (entry) {
1382         GstSchedulerState state;
1383
1384         GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
1385
1386         GST_CAT_DEBUG (debug_dataflow,
1387             "set COTHREAD_STOPPING flag on \"%s\"(@%p)",
1388             GST_ELEMENT_NAME (entry), entry);
1389         if (GST_ELEMENT_THREADSTATE (entry)) {
1390
1391           do_switch_from_main (entry);
1392
1393           state = GST_SCHEDULER_STATE (sched);
1394           /* if something changed, return - go on else */
1395           if (GST_FLAG_IS_SET (bsched, GST_BASIC_SCHEDULER_CHANGE) &&
1396               state != GST_SCHEDULER_STATE_ERROR)
1397             return GST_SCHEDULER_STATE_RUNNING;
1398         } else {
1399           GST_CAT_DEBUG (debug_dataflow,
1400               "cothread switch not possible, element has no threadstate");
1401           return GST_SCHEDULER_STATE_ERROR;
1402         }
1403
1404         /* following is a check to see if the chain was interrupted due to a
1405          * top-half state_change().  (i.e., if there's a pending state.)
1406          *
1407          * if it was, return to gstthread.c::gst_thread_main_loop() to
1408          * execute the state change.
1409          */
1410         GST_CAT_DEBUG (debug_dataflow, "cothread switch ended or interrupted");
1411
1412         if (state != GST_SCHEDULER_STATE_RUNNING) {
1413           GST_CAT_INFO (debug_dataflow, "scheduler is not running, in state %d",
1414               state);
1415           return state;
1416         }
1417
1418         scheduled++;
1419       } else {
1420         GST_CAT_INFO (debug_dataflow,
1421             "no entry in this chain, trying the next one");
1422       }
1423     } else {
1424       GST_CAT_INFO (debug_dataflow,
1425           "no enabled elements in this chain, trying the next one");
1426     }
1427   }
1428
1429   GST_CAT_LOG_OBJECT (debug_dataflow, sched, "leaving (%s)",
1430       GST_ELEMENT_NAME (sched->parent));
1431   if (scheduled == 0) {
1432     GST_CAT_INFO (debug_dataflow, "nothing was scheduled, return STOPPED");
1433     return GST_SCHEDULER_STATE_STOPPED;
1434   } else {
1435     GST_CAT_INFO (debug_dataflow, "scheduler still running, return RUNNING");
1436     return GST_SCHEDULER_STATE_RUNNING;
1437   }
1438 }
1439
1440
1441 static void
1442 gst_basic_scheduler_show (GstScheduler * sched)
1443 {
1444   GList *chains, *elements;
1445   GstElement *element;
1446   GstSchedulerChain *chain;
1447   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1448
1449   if (sched == NULL) {
1450     g_print ("scheduler doesn't exist for this element\n");
1451     return;
1452   }
1453
1454   g_return_if_fail (GST_IS_SCHEDULER (sched));
1455
1456   g_print ("SCHEDULER DUMP FOR MANAGING BIN \"%s\"\n",
1457       GST_ELEMENT_NAME (sched->parent));
1458
1459   g_print ("scheduler has %d elements in it: ", bsched->num_elements);
1460   elements = bsched->elements;
1461   while (elements) {
1462     element = GST_ELEMENT (elements->data);
1463     elements = g_list_next (elements);
1464
1465     g_print ("%s, ", GST_ELEMENT_NAME (element));
1466   }
1467   g_print ("\n");
1468
1469   g_print ("scheduler has %d chains in it\n", bsched->num_chains);
1470   chains = bsched->chains;
1471   while (chains) {
1472     chain = (GstSchedulerChain *) (chains->data);
1473     chains = g_list_next (chains);
1474
1475     g_print ("%p: ", chain);
1476
1477     elements = chain->disabled;
1478     while (elements) {
1479       element = GST_ELEMENT (elements->data);
1480       elements = g_list_next (elements);
1481
1482       g_print ("!%s, ", GST_ELEMENT_NAME (element));
1483     }
1484
1485     elements = chain->elements;
1486     while (elements) {
1487       element = GST_ELEMENT (elements->data);
1488       elements = g_list_next (elements);
1489
1490       g_print ("%s, ", GST_ELEMENT_NAME (element));
1491     }
1492     g_print ("\n");
1493   }
1494 }