First THREADED backport attempt, focusing on adding locks and making sure the API...
[platform/upstream/gstreamer.git] / gst / schedulers / gstbasicscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #include <gst/gst.h>
28
29 #include "cothreads_compat.h"
30
31 GST_DEBUG_CATEGORY_STATIC (debug_dataflow);
32 GST_DEBUG_CATEGORY_STATIC (debug_scheduler);
33 #define GST_CAT_DEFAULT debug_scheduler
34
35 typedef struct _GstSchedulerChain GstSchedulerChain;
36
37 #define GST_ELEMENT_THREADSTATE(elem)   (GST_ELEMENT (elem)->sched_private)
38 #define GST_RPAD_BUFPEN(pad)            (GST_REAL_PAD(pad)->sched_private)
39
40 #define GST_ELEMENT_COTHREAD_STOPPING                   GST_ELEMENT_SCHEDULER_PRIVATE1
41 #define GST_ELEMENT_IS_COTHREAD_STOPPING(element)       GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
42
43 typedef struct _GstBasicScheduler GstBasicScheduler;
44 typedef struct _GstBasicSchedulerClass GstBasicSchedulerClass;
45
46 #ifdef _COTHREADS_STANDARD
47 # define _SCHEDULER_NAME "standard"
48 #else
49 # define _SCHEDULER_NAME "basic"
50 #endif
51
52 struct _GstSchedulerChain
53 {
54   GstBasicScheduler *sched;
55
56   GList *disabled;
57
58   GList *elements;
59   gint num_elements;
60
61   GstElement *entry;
62
63   gint cothreaded_elements;
64   gboolean schedule;
65 };
66
67 #define GST_TYPE_BASIC_SCHEDULER \
68   (gst_basic_scheduler_get_type())
69 #define GST_BASIC_SCHEDULER(obj) \
70   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BASIC_SCHEDULER,GstBasicScheduler))
71 #define GST_BASIC_SCHEDULER_CLASS(klass) \
72   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BASIC_SCHEDULER,GstBasicSchedulerClass))
73 #define GST_IS_BASIC_SCHEDULER(obj) \
74   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BASIC_SCHEDULER))
75 #define GST_IS_BASIC_SCHEDULER_CLASS(obj) \
76   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASIC_SCHEDULER))
77
78 #define SCHED(element) GST_BASIC_SCHEDULER (GST_ELEMENT_SCHEDULER (element))
79
80 typedef enum
81 {
82   GST_BASIC_SCHEDULER_STATE_NONE,
83   GST_BASIC_SCHEDULER_STATE_STOPPED,
84   GST_BASIC_SCHEDULER_STATE_ERROR,
85   GST_BASIC_SCHEDULER_STATE_RUNNING
86 }
87 GstBasicSchedulerState;
88
89 typedef enum
90 {
91   /* something important has changed inside the scheduler */
92   GST_BASIC_SCHEDULER_CHANGE = GST_SCHEDULER_FLAG_LAST
93 }
94 GstBasicSchedulerFlags;
95
96 struct _GstBasicScheduler
97 {
98   GstScheduler parent;
99
100   GList *elements;
101   gint num_elements;
102
103   GList *chains;
104   gint num_chains;
105
106   GstBasicSchedulerState state;
107
108   cothread_context *context;
109   GstElement *current;
110 };
111
112 struct _GstBasicSchedulerClass
113 {
114   GstSchedulerClass parent_class;
115 };
116
117 static GType _gst_basic_scheduler_type = 0;
118
119 static void gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass);
120 static void gst_basic_scheduler_init (GstBasicScheduler * scheduler);
121
122 static void gst_basic_scheduler_dispose (GObject * object);
123
124 static void gst_basic_scheduler_setup (GstScheduler * sched);
125 static void gst_basic_scheduler_reset (GstScheduler * sched);
126 static void gst_basic_scheduler_add_element (GstScheduler * sched,
127     GstElement * element);
128 static void gst_basic_scheduler_remove_element (GstScheduler * sched,
129     GstElement * element);
130 static GstElementStateReturn gst_basic_scheduler_state_transition (GstScheduler
131     * sched, GstElement * element, gint transition);
132 static gboolean gst_basic_scheduler_yield (GstScheduler * sched,
133     GstElement * element);
134 static gboolean gst_basic_scheduler_interrupt (GstScheduler * sched,
135     GstElement * element);
136 static void gst_basic_scheduler_error (GstScheduler * sched,
137     GstElement * element);
138 static void gst_basic_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
139     GstPad * sinkpad);
140 static void gst_basic_scheduler_pad_unlink (GstScheduler * sched,
141     GstPad * srcpad, GstPad * sinkpad);
142 static GstData *gst_basic_scheduler_pad_select (GstScheduler * sched,
143     GstPad ** selected, GstPad ** padlist);
144 static GstSchedulerState gst_basic_scheduler_iterate (GstScheduler * sched);
145
146 static void gst_basic_scheduler_show (GstScheduler * sched);
147
148 static GstSchedulerClass *parent_class = NULL;
149
150 /* for threaded bins, these pre- and post-run functions lock and unlock the
151  * elements. we have to avoid deadlocks, so we make these convenience macros
152  * that will avoid using do_cothread_switch from within the scheduler. */
153
154 #define do_element_switch(element) G_STMT_START{                \
155   GstElement *from = SCHED (element)->current;                  \
156   if (from && from->post_run_func)                              \
157     from->post_run_func (from);                                 \
158   SCHED (element)->current = element;                           \
159   if (element->pre_run_func)                                    \
160     element->pre_run_func (element);                            \
161   do_cothread_switch (GST_ELEMENT_THREADSTATE (element));       \
162 }G_STMT_END
163
164 #define do_switch_to_main(sched) G_STMT_START{                  \
165   GstElement *current = ((GstBasicScheduler*)sched)->current;   \
166   if (current && current->post_run_func)                        \
167     current->post_run_func (current);                           \
168   ((GstBasicScheduler*) sched)->current = NULL;                         \
169   do_cothread_switch                                            \
170     (do_cothread_get_main                                       \
171        (((GstBasicScheduler*)sched)->context));                 \
172 }G_STMT_END
173
174 #define do_switch_from_main(entry) G_STMT_START{                \
175   if (entry->pre_run_func)                                      \
176     entry->pre_run_func (entry);                                \
177   SCHED (entry)->current = entry;                               \
178   do_cothread_switch (GST_ELEMENT_THREADSTATE (entry));         \
179 }G_STMT_END
180
181 static GType
182 gst_basic_scheduler_get_type (void)
183 {
184   if (!_gst_basic_scheduler_type) {
185     static const GTypeInfo scheduler_info = {
186       sizeof (GstBasicSchedulerClass),
187       NULL,
188       NULL,
189       (GClassInitFunc) gst_basic_scheduler_class_init,
190       NULL,
191       NULL,
192       sizeof (GstBasicScheduler),
193       0,
194       (GInstanceInitFunc) gst_basic_scheduler_init,
195       NULL
196     };
197
198     _gst_basic_scheduler_type =
199         g_type_register_static (GST_TYPE_SCHEDULER,
200         "Gst" COTHREADS_NAME_CAPITAL "Scheduler", &scheduler_info, 0);
201   }
202   return _gst_basic_scheduler_type;
203 }
204
205 static void
206 gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass)
207 {
208   GObjectClass *gobject_class;
209   GstObjectClass *gstobject_class;
210   GstSchedulerClass *gstscheduler_class;
211
212   gobject_class = (GObjectClass *) klass;
213   gstobject_class = (GstObjectClass *) klass;
214   gstscheduler_class = (GstSchedulerClass *) klass;
215
216   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
217
218   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_basic_scheduler_dispose);
219
220   gstscheduler_class->setup = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup);
221   gstscheduler_class->reset = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset);
222   gstscheduler_class->add_element =
223       GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element);
224   gstscheduler_class->remove_element =
225       GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element);
226   gstscheduler_class->state_transition =
227       GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition);
228   gstscheduler_class->yield = GST_DEBUG_FUNCPTR (gst_basic_scheduler_yield);
229   gstscheduler_class->interrupt =
230       GST_DEBUG_FUNCPTR (gst_basic_scheduler_interrupt);
231   gstscheduler_class->error = GST_DEBUG_FUNCPTR (gst_basic_scheduler_error);
232   gstscheduler_class->pad_link =
233       GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_link);
234   gstscheduler_class->pad_unlink =
235       GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_unlink);
236   gstscheduler_class->pad_select =
237       GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select);
238   gstscheduler_class->clock_wait = NULL;
239   gstscheduler_class->iterate = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate);
240
241   gstscheduler_class->show = GST_DEBUG_FUNCPTR (gst_basic_scheduler_show);
242
243   do_cothreads_init (NULL);
244 }
245
246 static void
247 gst_basic_scheduler_init (GstBasicScheduler * scheduler)
248 {
249   scheduler->elements = NULL;
250   scheduler->num_elements = 0;
251   scheduler->chains = NULL;
252   scheduler->num_chains = 0;
253
254   GST_FLAG_SET (scheduler, GST_SCHEDULER_FLAG_NEW_API);
255 }
256
257 static void
258 gst_basic_scheduler_dispose (GObject * object)
259 {
260   G_OBJECT_CLASS (parent_class)->dispose (object);
261 }
262
263 static gboolean
264 plugin_init (GstPlugin * plugin)
265 {
266   if (!gst_scheduler_register (plugin, "basic" COTHREADS_NAME,
267           "A basic scheduler using " COTHREADS_NAME " cothreads",
268           gst_basic_scheduler_get_type ()))
269     return FALSE;
270
271   GST_DEBUG_CATEGORY_INIT (debug_dataflow, "basic_dataflow", 0,
272       "basic scheduler dataflow");
273   GST_DEBUG_CATEGORY_INIT (debug_scheduler, "basic_scheduler", 0,
274       "basic scheduler general information");
275
276   return TRUE;
277 }
278
279 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
280     GST_VERSION_MINOR,
281     "gstbasic" COTHREADS_NAME "scheduler",
282     "a basic scheduler using " COTHREADS_NAME " cothreads",
283     plugin_init, VERSION, GST_LICENSE, GST_PACKAGE, GST_ORIGIN)
284
285      static int gst_basic_scheduler_loopfunc_wrapper (int argc, char **argv)
286 {
287   GstElement *element = GST_ELEMENT (argv);
288   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
289
290   GST_DEBUG ("entering loopfunc wrapper of %s", name);
291
292   gst_object_ref (GST_OBJECT (element));
293   do {
294     GST_CAT_DEBUG (debug_dataflow, "calling loopfunc %s for element %s",
295         GST_DEBUG_FUNCPTR_NAME (element->loopfunc), name);
296     (element->loopfunc) (element);
297     GST_CAT_DEBUG (debug_dataflow, "element %s ended loop function", name);
298
299   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
300   GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
301
302   /* due to oddities in the cothreads code, when this function returns it will
303    * switch to the main cothread. thus, we need to unlock the current element. */
304   if (SCHED (element)) {
305     if (SCHED (element)->current && SCHED (element)->current->post_run_func) {
306       SCHED (element)->current->post_run_func (SCHED (element)->current);
307     }
308     SCHED (element)->current = NULL;
309   }
310
311   GST_DEBUG ("leaving loopfunc wrapper of %s", name);
312   gst_object_unref (GST_OBJECT (element));
313
314   return 0;
315 }
316
317 static int
318 gst_basic_scheduler_chain_wrapper (int argc, char **argv)
319 {
320   GSList *already_iterated = NULL;
321   GstElement *element = GST_ELEMENT (argv);
322   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
323
324   GST_DEBUG ("entered chain wrapper of element %s", name);
325
326   GST_CAT_DEBUG (debug_dataflow, "stepping through pads");
327
328   gst_object_ref (GST_OBJECT (element));
329   do {
330     GList *pads;
331
332     do {
333       pads = element->pads;
334
335       while (pads) {
336         GstPad *pad = GST_PAD (pads->data);
337         GstRealPad *realpad;
338
339         if (!GST_IS_REAL_PAD (pad))
340           continue;
341
342         realpad = GST_REAL_PAD (pad);
343
344         if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK &&
345             GST_PAD_IS_LINKED (realpad) &&
346             g_slist_find (already_iterated, pad) == NULL) {
347           GstData *data;
348
349           GST_CAT_DEBUG (debug_dataflow, "pulling data from %s:%s", name,
350               GST_PAD_NAME (pad));
351           data = gst_pad_pull (pad);
352           if (data) {
353             if (GST_IS_EVENT (data) && !GST_ELEMENT_IS_EVENT_AWARE (element)) {
354               gst_pad_send_event (pad, GST_EVENT (data));
355             } else {
356               GST_CAT_DEBUG (debug_dataflow,
357                   "calling chain function of %s:%s %p", name,
358                   GST_PAD_NAME (pad), data);
359               gst_pad_call_chain_function (GST_PAD (realpad), data);
360               GST_CAT_DEBUG (debug_dataflow,
361                   "calling chain function of element %s done", name);
362             }
363           }
364           already_iterated = g_slist_prepend (already_iterated, pad);
365           break;
366         }
367         pads = g_list_next (pads);
368       }
369     } while (pads != NULL);
370     if (already_iterated == NULL) {
371       GST_DEBUG_OBJECT (SCHED (element), "nothing to iterate for element %s",
372           GST_ELEMENT_NAME (element));
373       break;
374     }
375     g_slist_free (already_iterated);
376     already_iterated = NULL;
377   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
378
379   GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
380
381   /* due to oddities in the cothreads code, when this function returns it will
382    * switch to the main cothread. thus, we need to unlock the current element. */
383   if (SCHED (element)) {
384     if (SCHED (element)->current && SCHED (element)->current->post_run_func) {
385       SCHED (element)->current->post_run_func (SCHED (element)->current);
386     }
387     SCHED (element)->current = NULL;
388   }
389
390   GST_CAT_DEBUG (debug_dataflow, "leaving chain wrapper of element %s", name);
391   gst_object_unref (GST_OBJECT (element));
392
393   return 0;
394 }
395
396 static int
397 gst_basic_scheduler_src_wrapper (int argc, char **argv)
398 {
399   GstElement *element = GST_ELEMENT (argv);
400   GList *pads;
401   GstRealPad *realpad;
402   GstData *data = NULL;
403   gboolean inf_loop;
404   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
405
406   GST_DEBUG ("entering src wrapper of element %s", name);
407
408   do {
409     inf_loop = TRUE;
410     pads = element->pads;
411     while (pads) {
412
413       if (!GST_IS_REAL_PAD (pads->data))
414         continue;
415
416       realpad = GST_REAL_PAD (pads->data);
417
418       pads = g_list_next (pads);
419       if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SRC) {
420         inf_loop = FALSE;
421         GST_CAT_DEBUG (debug_dataflow, "calling _getfunc for %s:%s",
422             GST_DEBUG_PAD_NAME (realpad));
423         data = gst_pad_call_get_function (GST_PAD (realpad));
424         if (data) {
425           GST_CAT_DEBUG (debug_dataflow, "calling gst_pad_push on pad %s:%s %p",
426               GST_DEBUG_PAD_NAME (realpad), data);
427           gst_pad_push (GST_PAD (realpad), data);
428         }
429       }
430     }
431   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element) && !inf_loop);
432
433   GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
434
435   /* due to oddities in the cothreads code, when this function returns it will
436    * switch to the main cothread. thus, we need to unlock the current element. */
437   if (SCHED (element)->current->post_run_func)
438     SCHED (element)->current->post_run_func (SCHED (element)->current);
439   SCHED (element)->current = NULL;
440
441   GST_DEBUG ("leaving src wrapper of element %s", name);
442
443   return 0;
444 }
445
446 static void
447 gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstData * data)
448 {
449   gint loop_count = 100;
450   GstElement *parent;
451   GstRealPad *peer;
452
453   parent = GST_PAD_PARENT (pad);
454   peer = GST_RPAD_PEER (pad);
455
456   GST_CAT_DEBUG (debug_dataflow, "putting buffer %p in peer \"%s:%s\"'s pen",
457       data, GST_DEBUG_PAD_NAME (peer));
458
459   /* 
460    * loop until the bufferpen is empty so we can fill it up again
461    */
462   while (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) != NULL && --loop_count) {
463     GST_CAT_DEBUG (debug_dataflow, "switching to %p to empty bufpen %d",
464         GST_ELEMENT_THREADSTATE (parent), loop_count);
465
466     do_element_switch (parent);
467
468     /* we may no longer be the same pad, check. */
469     if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
470       GST_CAT_DEBUG (debug_dataflow, "new pad in mid-switch!");
471       pad = (GstPad *) GST_RPAD_PEER (peer);
472     }
473     parent = GST_PAD_PARENT (pad);
474     peer = GST_RPAD_PEER (pad);
475   }
476
477   if (loop_count == 0) {
478     GST_ELEMENT_ERROR (parent, CORE, SCHEDULER, (NULL),
479         ("(internal error) basic: maximum number of switches exceeded"));
480     return;
481   }
482
483   g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL);
484
485   /* now fill the bufferpen and switch so it can be consumed */
486   GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = data;
487   GST_CAT_DEBUG (debug_dataflow, "switching to %p to consume buffer %p",
488       GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)), data);
489
490   do_element_switch (parent);
491
492   GST_CAT_DEBUG (debug_dataflow, "leaving chainhandler proxy of %s:%s",
493       GST_DEBUG_PAD_NAME (pad));
494 }
495
496 static void
497 gst_basic_scheduler_select_proxy (GstPad * pad, GstData * data)
498 {
499   GstElement *parent;
500
501   parent = GST_PAD_PARENT (pad);
502
503   GST_CAT_DEBUG (debug_dataflow, "putting buffer %p in peer's pen of pad %s:%s",
504       data, GST_DEBUG_PAD_NAME (pad));
505
506   g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL);
507   /* now fill the bufferpen and switch so it can be consumed */
508   GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = data;
509   GST_CAT_DEBUG (debug_dataflow, "switching to %p",
510       GST_ELEMENT_THREADSTATE (parent));
511   /* FIXME temporarily diabled */
512   /* parent->select_pad = pad; */
513
514   do_element_switch (parent);
515
516   GST_CAT_DEBUG (debug_dataflow, "done switching");
517 }
518
519
520 static GstData *
521 gst_basic_scheduler_gethandler_proxy (GstPad * pad)
522 {
523   GstData *data;
524   GstElement *parent;
525   GstRealPad *peer;
526
527   GST_CAT_DEBUG (debug_dataflow, "entering gethandler proxy of %s:%s",
528       GST_DEBUG_PAD_NAME (pad));
529
530   parent = GST_PAD_PARENT (pad);
531   peer = GST_RPAD_PEER (pad);
532
533   /* FIXME this should be bounded */
534   /* we will loop switching to the peer until it's filled up the bufferpen */
535   while (GST_RPAD_BUFPEN (pad) == NULL) {
536
537     GST_CAT_DEBUG (debug_dataflow, "switching to \"%s\": %p to fill bufpen",
538         GST_ELEMENT_NAME (parent), GST_ELEMENT_THREADSTATE (parent));
539
540     do_element_switch (parent);
541
542     /* we may no longer be the same pad, check. */
543     if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
544       GST_CAT_DEBUG (debug_dataflow, "new pad in mid-switch!");
545       pad = (GstPad *) GST_RPAD_PEER (peer);
546       if (!pad) {
547         GST_ELEMENT_ERROR (parent, CORE, PAD, (NULL), ("pad unlinked"));
548       }
549       parent = GST_PAD_PARENT (pad);
550       peer = GST_RPAD_PEER (pad);
551     }
552   }
553   GST_CAT_DEBUG (debug_dataflow, "done switching");
554
555   /* now grab the buffer from the pen, clear the pen, and return the buffer */
556   data = GST_RPAD_BUFPEN (pad);
557   GST_RPAD_BUFPEN (pad) = NULL;
558
559   GST_CAT_DEBUG (debug_dataflow, "leaving gethandler proxy of %s:%s",
560       GST_DEBUG_PAD_NAME (pad));
561
562   return data;
563 }
564
565 static gboolean
566 gst_basic_scheduler_eventhandler_proxy (GstPad * srcpad, GstEvent * event)
567 {
568   gboolean flush;
569
570   GST_CAT_INFO (debug_dataflow, "intercepting event %d on pad %s:%s",
571       GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (srcpad));
572
573   /* figure out if we need to flush */
574   switch (GST_EVENT_TYPE (event)) {
575     case GST_EVENT_FLUSH:
576       flush = TRUE;
577       break;
578     case GST_EVENT_SEEK:
579     case GST_EVENT_SEEK_SEGMENT:
580       flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
581       break;
582     default:
583       flush = FALSE;
584       break;
585   }
586
587   if (flush) {
588     GstData *data = GST_RPAD_BUFPEN (srcpad);
589
590     GST_CAT_INFO (debug_dataflow, "event is flush");
591
592     if (data) {
593       GST_CAT_INFO (debug_dataflow, "need to clear some buffers");
594
595       gst_data_unref (data);
596       GST_RPAD_BUFPEN (srcpad) = NULL;
597     }
598   }
599   return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
600 }
601
602 static gboolean
603 gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
604 {
605   GList *elements;
606   GstElement *element;
607   cothread_func wrapper_function;
608   const GList *pads;
609   GstPad *pad;
610
611   GST_DEBUG ("chain is using COTHREADS");
612
613   g_assert (chain->sched->context != NULL);
614
615   /* walk through all the chain's elements */
616   elements = chain->elements;
617   while (elements) {
618     gboolean decoupled;
619
620     element = GST_ELEMENT (elements->data);
621     elements = g_list_next (elements);
622
623     decoupled = GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED);
624
625     /* start out without a wrapper function, we select it later */
626     wrapper_function = NULL;
627
628     /* if the element has a loopfunc... */
629     if (element->loopfunc != NULL) {
630       wrapper_function =
631           GST_DEBUG_FUNCPTR (gst_basic_scheduler_loopfunc_wrapper);
632       GST_DEBUG ("element '%s' is a loop-based", GST_ELEMENT_NAME (element));
633     } else {
634       /* otherwise we need to decide what kind of cothread
635        * if it's not DECOUPLED, we decide based on 
636        * whether it's a source or not */
637       if (!decoupled) {
638         /* if it doesn't have any sinks, it must be a source (duh) */
639         if (element->numsinkpads == 0) {
640           wrapper_function =
641               GST_DEBUG_FUNCPTR (gst_basic_scheduler_src_wrapper);
642           GST_DEBUG ("element '%s' is a source, using _src_wrapper",
643               GST_ELEMENT_NAME (element));
644         } else {
645           wrapper_function =
646               GST_DEBUG_FUNCPTR (gst_basic_scheduler_chain_wrapper);
647           GST_DEBUG ("element '%s' is a filter, using _chain_wrapper",
648               GST_ELEMENT_NAME (element));
649         }
650       }
651     }
652
653     /* now we have to walk through the pads to set up their state */
654     pads = element->pads;
655     while (pads) {
656       GstPad *peerpad;
657
658       pad = GST_PAD (pads->data);
659       pads = g_list_next (pads);
660
661       if (!GST_IS_REAL_PAD (pad))
662         continue;
663
664       peerpad = GST_PAD_PEER (pad);
665       if (peerpad) {
666         GstElement *peerelement = GST_ELEMENT (GST_PAD_PARENT (peerpad));
667         gboolean different_sched =
668             (peerelement->sched != GST_SCHEDULER (chain->sched));
669         gboolean peer_decoupled =
670             GST_FLAG_IS_SET (peerelement, GST_ELEMENT_DECOUPLED);
671
672         GST_DEBUG ("inspecting pad %s:%s", GST_DEBUG_PAD_NAME (peerpad));
673
674         /* we don't need to check this for decoupled elements */
675         if (!decoupled) {
676           /* if the peer element is in another schedule, 
677            * it's not decoupled and we are not decoupled
678            * either, we have an error */
679           if (different_sched && !peer_decoupled) {
680             GST_ELEMENT_ERROR (element, CORE, SCHEDULER, (NULL),
681                 ("element \"%s\" is not decoupled but has pads in different schedulers",
682                     GST_ELEMENT_NAME (element)));
683             return FALSE;
684           }
685           /* ok, the peer is in a different scheduler and is decoupled, 
686            * we need to set the
687            * handlers so we can talk with it */
688           else if (different_sched) {
689             if (GST_RPAD_DIRECTION (peerpad) == GST_PAD_SINK) {
690               GST_DEBUG ("copying chain func into push proxy for peer %s:%s",
691                   GST_DEBUG_PAD_NAME (peerpad));
692               GST_RPAD_CHAINHANDLER (peerpad) = gst_pad_call_chain_function;
693             } else {
694               GST_DEBUG ("copying get func into pull proxy for peer %s:%s",
695                   GST_DEBUG_PAD_NAME (peerpad));
696               GST_RPAD_GETHANDLER (peerpad) = gst_pad_call_get_function;
697             }
698           }
699         }
700         /* in any case we need to copy the eventfunc into the handler */
701         GST_RPAD_EVENTHANDLER (peerpad) = GST_RPAD_EVENTFUNC (peerpad);
702       }
703
704       /* if the element is DECOUPLED or outside the manager, we have to chain */
705       if (decoupled) {
706         /* set the chain proxies */
707         if (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK) {
708           GST_DEBUG ("copying chain function into push proxy for %s:%s",
709               GST_DEBUG_PAD_NAME (pad));
710           GST_RPAD_CHAINHANDLER (pad) = gst_pad_call_chain_function;
711         } else {
712           GST_DEBUG ("copying get function into pull proxy for %s:%s",
713               GST_DEBUG_PAD_NAME (pad));
714           GST_RPAD_GETHANDLER (pad) = gst_pad_call_get_function;
715         }
716       }
717       /* otherwise we really are a cothread */
718       else {
719         if (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK) {
720           GST_DEBUG ("setting cothreaded push proxy for sinkpad %s:%s",
721               GST_DEBUG_PAD_NAME (pad));
722           GST_RPAD_CHAINHANDLER (pad) =
723               GST_DEBUG_FUNCPTR (gst_basic_scheduler_chainhandler_proxy);
724           GST_RPAD_EVENTHANDLER (pad) = GST_RPAD_EVENTFUNC (pad);
725         } else {
726           GST_DEBUG ("setting cothreaded pull proxy for srcpad %s:%s",
727               GST_DEBUG_PAD_NAME (pad));
728           GST_RPAD_GETHANDLER (pad) =
729               GST_DEBUG_FUNCPTR (gst_basic_scheduler_gethandler_proxy);
730           /* the gethandler proxy function can queue a buffer in the bufpen, we need
731            * to remove this buffer when a flush event is sent on the pad */
732           GST_RPAD_EVENTHANDLER (pad) =
733               GST_DEBUG_FUNCPTR (gst_basic_scheduler_eventhandler_proxy);
734         }
735       }
736     }
737
738     /* need to set up the cothread now */
739     if (wrapper_function != NULL) {
740       if (GST_ELEMENT_THREADSTATE (element) == NULL) {
741         GST_DEBUG ("about to create a cothread, wrapper for '%s' is &%s",
742             GST_ELEMENT_NAME (element),
743             GST_DEBUG_FUNCPTR_NAME (wrapper_function));
744         do_cothread_create (GST_ELEMENT_THREADSTATE (element),
745             chain->sched->context, wrapper_function, 0, (char **) element);
746         if (GST_ELEMENT_THREADSTATE (element) == NULL) {
747           GST_ELEMENT_ERROR (element, RESOURCE, TOO_LAZY, (NULL),
748               ("could not create cothread for \"%s\"",
749                   GST_ELEMENT_NAME (element)));
750           return FALSE;
751         }
752         GST_DEBUG ("created cothread %p for '%s'",
753             GST_ELEMENT_THREADSTATE (element), GST_ELEMENT_NAME (element));
754       } else {
755         /* set the cothread wrapper function */
756         GST_DEBUG ("about to set the wrapper function for '%s' to &%s",
757             GST_ELEMENT_NAME (element),
758             GST_DEBUG_FUNCPTR_NAME (wrapper_function));
759         do_cothread_setfunc (GST_ELEMENT_THREADSTATE (element),
760             chain->sched->context, wrapper_function, 0, (char **) element);
761         GST_DEBUG ("set wrapper function for '%s' to &%s",
762             GST_ELEMENT_NAME (element),
763             GST_DEBUG_FUNCPTR_NAME (wrapper_function));
764       }
765     }
766   }
767
768   return TRUE;
769 }
770
771 static GstSchedulerChain *
772 gst_basic_scheduler_chain_new (GstBasicScheduler * sched)
773 {
774   GstSchedulerChain *chain = g_new (GstSchedulerChain, 1);
775
776   /* initialize the chain with sane values */
777   chain->sched = sched;
778   chain->disabled = NULL;
779   chain->elements = NULL;
780   chain->num_elements = 0;
781   chain->entry = NULL;
782   chain->cothreaded_elements = 0;
783   chain->schedule = FALSE;
784
785   /* add the chain to the schedulers' list of chains */
786   sched->chains = g_list_prepend (sched->chains, chain);
787   sched->num_chains++;
788
789   /* notify the scheduler that something changed */
790   GST_FLAG_SET (sched, GST_BASIC_SCHEDULER_CHANGE);
791
792   GST_INFO ("created new chain %p, now are %d chains in sched %p",
793       chain, sched->num_chains, sched);
794
795   return chain;
796 }
797
798 static void
799 gst_basic_scheduler_chain_destroy (GstSchedulerChain * chain)
800 {
801   GstBasicScheduler *sched = chain->sched;
802
803   /* remove the chain from the schedulers' list of chains */
804   sched->chains = g_list_remove (sched->chains, chain);
805   sched->num_chains--;
806
807   /* destroy the chain */
808   g_list_free (chain->disabled);        /* should be empty... */
809   g_list_free (chain->elements);        /* ditto              */
810
811   GST_INFO ("destroyed chain %p, now are %d chains in sched %p", chain,
812       sched->num_chains, sched);
813
814   g_free (chain);
815
816   /* notify the scheduler that something changed */
817   GST_FLAG_SET (sched, GST_BASIC_SCHEDULER_CHANGE);
818 }
819
820 static void
821 gst_basic_scheduler_chain_add_element (GstSchedulerChain * chain,
822     GstElement * element)
823 {
824   /* set the sched pointer for the element */
825   element->sched = GST_SCHEDULER (chain->sched);
826
827   /* add the element to either the main list or the disabled list */
828   if (GST_STATE (element) == GST_STATE_PLAYING) {
829     GST_INFO ("adding element \"%s\" to chain %p enabled",
830         GST_ELEMENT_NAME (element), chain);
831     chain->elements = g_list_prepend (chain->elements, element);
832   } else {
833     GST_INFO ("adding element \"%s\" to chain %p disabled",
834         GST_ELEMENT_NAME (element), chain);
835     chain->disabled = g_list_prepend (chain->disabled, element);
836   }
837   chain->num_elements++;
838
839   /* notify the scheduler that something changed */
840   GST_FLAG_SET (chain->sched, GST_BASIC_SCHEDULER_CHANGE);
841 }
842
843 static gboolean
844 gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain,
845     GstElement * element)
846 {
847   GST_INFO ("enabling element \"%s\" in chain %p",
848       GST_ELEMENT_NAME (element), chain);
849
850   /* remove from disabled list */
851   chain->disabled = g_list_remove (chain->disabled, element);
852
853   /* add to elements list */
854   chain->elements = g_list_prepend (chain->elements, element);
855
856   /* notify the scheduler that something changed */
857   GST_FLAG_SET (chain->sched, GST_BASIC_SCHEDULER_CHANGE);
858   /* GST_FLAG_UNSET(element, GST_ELEMENT_COTHREAD_STOPPING); */
859
860   /* reschedule the chain */
861   return gst_basic_scheduler_cothreaded_chain (GST_BIN (GST_SCHEDULER (chain->
862               sched)->parent), chain);
863 }
864
865 static void
866 gst_basic_scheduler_chain_disable_element (GstSchedulerChain * chain,
867     GstElement * element)
868 {
869   GST_INFO ("disabling element \"%s\" in chain %p",
870       GST_ELEMENT_NAME (element), chain);
871
872   /* remove from elements list */
873   chain->elements = g_list_remove (chain->elements, element);
874
875   /* add to disabled list */
876   chain->disabled = g_list_prepend (chain->disabled, element);
877
878   /* notify the scheduler that something changed */
879   GST_FLAG_SET (chain->sched, GST_BASIC_SCHEDULER_CHANGE);
880   GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
881
882   /* reschedule the chain */
883 /* FIXME this should be done only if manager state != NULL */
884 /*  gst_basic_scheduler_cothreaded_chain(GST_BIN(chain->sched->parent),chain); */
885 }
886
887 static void
888 gst_basic_scheduler_chain_remove_element (GstSchedulerChain * chain,
889     GstElement * element)
890 {
891   GST_INFO ("removing element \"%s\" from chain %p", GST_ELEMENT_NAME (element),
892       chain);
893
894   /* if it's active, deactivate it */
895   if (g_list_find (chain->elements, element)) {
896     gst_basic_scheduler_chain_disable_element (chain, element);
897   }
898   /* we have to check for a threadstate here because a queue doesn't have one */
899   if (GST_ELEMENT_THREADSTATE (element)) {
900     do_cothread_destroy (GST_ELEMENT_THREADSTATE (element));
901     GST_ELEMENT_THREADSTATE (element) = NULL;
902   }
903
904   /* remove the element from the list of elements */
905   chain->disabled = g_list_remove (chain->disabled, element);
906   chain->num_elements--;
907
908   /* notify the scheduler that something changed */
909   GST_FLAG_SET (chain->sched, GST_BASIC_SCHEDULER_CHANGE);
910
911   /* if there are no more elements in the chain, destroy the chain */
912   if (chain->num_elements == 0)
913     gst_basic_scheduler_chain_destroy (chain);
914
915 }
916
917 static void
918 gst_basic_scheduler_chain_elements (GstBasicScheduler * sched,
919     GstElement * element1, GstElement * element2)
920 {
921   GList *chains;
922   GstSchedulerChain *chain;
923   GstSchedulerChain *chain1 = NULL, *chain2 = NULL;
924   GstElement *element;
925
926   /* first find the chains that hold the two  */
927   chains = sched->chains;
928   while (chains) {
929     chain = (GstSchedulerChain *) (chains->data);
930     chains = g_list_next (chains);
931
932     if (g_list_find (chain->disabled, element1))
933       chain1 = chain;
934     else if (g_list_find (chain->elements, element1))
935       chain1 = chain;
936
937     if (g_list_find (chain->disabled, element2))
938       chain2 = chain;
939     else if (g_list_find (chain->elements, element2))
940       chain2 = chain;
941   }
942
943   /* first check to see if they're in the same chain, we're done if that's the case */
944   if ((chain1 != NULL) && (chain1 == chain2)) {
945     GST_INFO ("elements are already in the same chain");
946     return;
947   }
948
949   /* now, if neither element has a chain, create one */
950   if ((chain1 == NULL) && (chain2 == NULL)) {
951     GST_INFO ("creating new chain to hold two new elements");
952     chain = gst_basic_scheduler_chain_new (sched);
953     gst_basic_scheduler_chain_add_element (chain, element1);
954     gst_basic_scheduler_chain_add_element (chain, element2);
955     /* FIXME chain changed here */
956 /*    gst_basic_scheduler_cothreaded_chain(chain->sched->parent,chain); */
957
958     /* otherwise if both have chains already, join them */
959   } else if ((chain1 != NULL) && (chain2 != NULL)) {
960     GST_INFO ("merging chain %p into chain %p", chain2, chain1);
961     /* take the contents of chain2 and merge them into chain1 */
962     chain1->disabled =
963         g_list_concat (chain1->disabled, g_list_copy (chain2->disabled));
964     chain1->elements =
965         g_list_concat (chain1->elements, g_list_copy (chain2->elements));
966     chain1->num_elements += chain2->num_elements;
967     gst_basic_scheduler_chain_destroy (chain2);
968     if (sched->context)
969
970       gst_basic_scheduler_cothreaded_chain (GST_BIN (GST_SCHEDULER (chain1->
971                   sched)->parent), chain1);
972
973     /* otherwise one has a chain already, the other doesn't */
974   } else {
975     /* pick out which one has the chain, and which doesn't */
976     if (chain1 != NULL)
977       chain = chain1, element = element2;
978     else
979       chain = chain2, element = element1;
980
981     GST_INFO ("adding element to existing chain");
982     gst_basic_scheduler_chain_add_element (chain, element);
983     /* FIXME chain changed here */
984 /*    gst_basic_scheduler_cothreaded_chain(chain->sched->parent,chain); */
985   }
986
987 }
988
989
990 /* find the chain within the scheduler that holds the element, if any */
991 static GstSchedulerChain *
992 gst_basic_scheduler_find_chain (GstBasicScheduler * sched, GstElement * element)
993 {
994   GList *chains;
995   GstSchedulerChain *chain;
996
997   GST_INFO ("searching for element \"%s\" in chains",
998       GST_ELEMENT_NAME (element));
999
1000   chains = sched->chains;
1001   while (chains) {
1002     chain = (GstSchedulerChain *) (chains->data);
1003     chains = g_list_next (chains);
1004
1005     if (g_list_find (chain->elements, element))
1006       return chain;
1007     if (g_list_find (chain->disabled, element))
1008       return chain;
1009   }
1010
1011   return NULL;
1012 }
1013
1014 static void
1015 gst_basic_scheduler_chain_recursive_add (GstSchedulerChain * chain,
1016     GstElement * element, gboolean remove)
1017 {
1018   GList *pads;
1019   GstPad *pad;
1020   GstElement *peerelement;
1021   GstSchedulerChain *prevchain;
1022
1023   /* check to see if it's in a chain already */
1024   prevchain = gst_basic_scheduler_find_chain (chain->sched, element);
1025   /* if it's already in another chain, either remove or punt */
1026   if (prevchain != NULL) {
1027     if (remove == TRUE)
1028       gst_basic_scheduler_chain_remove_element (prevchain, element);
1029     else
1030       return;
1031   }
1032
1033   /* add it to this one */
1034   gst_basic_scheduler_chain_add_element (chain, element);
1035
1036   GST_DEBUG ("recursing on element \"%s\"", GST_ELEMENT_NAME (element));
1037   /* now go through all the pads and see which peers can be added */
1038   pads = element->pads;
1039   while (pads) {
1040     pad = GST_PAD (pads->data);
1041     pads = g_list_next (pads);
1042
1043     GST_DEBUG ("have pad %s:%s, checking for valid peer",
1044         GST_DEBUG_PAD_NAME (pad));
1045     /* if the peer exists and could be in the same chain */
1046     if (GST_PAD_PEER (pad)) {
1047       GST_DEBUG ("has peer %s:%s", GST_DEBUG_PAD_NAME (GST_PAD_PEER (pad)));
1048       peerelement = GST_PAD_PARENT (GST_PAD_PEER (pad));
1049       if (GST_ELEMENT_SCHEDULER (GST_PAD_PARENT (pad)) ==
1050           GST_ELEMENT_SCHEDULER (peerelement)) {
1051         GST_DEBUG ("peer \"%s\" is valid for same chain",
1052             GST_ELEMENT_NAME (peerelement));
1053         gst_basic_scheduler_chain_recursive_add (chain, peerelement, remove);
1054       }
1055     }
1056   }
1057 }
1058
1059 /*
1060  * Entry points for this scheduler.
1061  */
1062 static void
1063 gst_basic_scheduler_setup (GstScheduler * sched)
1064 {
1065   /* first create thread context */
1066   if (GST_BASIC_SCHEDULER (sched)->context == NULL) {
1067     GST_DEBUG ("initializing cothread context");
1068     GST_BASIC_SCHEDULER (sched)->context = do_cothread_context_init ();
1069   }
1070 }
1071
1072 static void
1073 gst_basic_scheduler_reset (GstScheduler * sched)
1074 {
1075   cothread_context *ctx;
1076   GList *elements = GST_BASIC_SCHEDULER (sched)->elements;
1077
1078   while (elements) {
1079     GstElement *element = GST_ELEMENT (elements->data);
1080
1081     if (GST_ELEMENT_THREADSTATE (element)) {
1082       do_cothread_destroy (GST_ELEMENT_THREADSTATE (element));
1083       GST_ELEMENT_THREADSTATE (element) = NULL;
1084     }
1085     elements = g_list_next (elements);
1086   }
1087
1088   ctx = GST_BASIC_SCHEDULER (sched)->context;
1089
1090   do_cothread_context_destroy (ctx);
1091
1092   GST_BASIC_SCHEDULER (sched)->context = NULL;
1093 }
1094
1095 static void
1096 gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
1097 {
1098   GstSchedulerChain *chain;
1099   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1100
1101   GST_INFO ("adding element \"%s\" to scheduler", GST_ELEMENT_NAME (element));
1102
1103   /* only deal with elements after this point, not bins */
1104   /* exception is made for Bin's that are schedulable, like the autoplugger */
1105   if (GST_IS_BIN (element)
1106       && !GST_FLAG_IS_SET (element, GST_BIN_SELF_SCHEDULABLE))
1107     return;
1108
1109   /* first add it to the list of elements that are to be scheduled */
1110   bsched->elements = g_list_prepend (bsched->elements, element);
1111   bsched->num_elements++;
1112
1113   /* create a chain to hold it, and add */
1114   chain = gst_basic_scheduler_chain_new (bsched);
1115   gst_basic_scheduler_chain_add_element (chain, element);
1116 }
1117
1118 static void
1119 gst_basic_scheduler_remove_element (GstScheduler * sched, GstElement * element)
1120 {
1121   GstSchedulerChain *chain;
1122   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1123
1124   if (g_list_find (bsched->elements, element)) {
1125     GST_INFO ("removing element \"%s\" from scheduler",
1126         GST_ELEMENT_NAME (element));
1127
1128     /* if we are removing the currently scheduled element */
1129     if (bsched->current == element) {
1130       GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
1131       if (element->post_run_func)
1132         element->post_run_func (element);
1133       bsched->current = NULL;
1134     }
1135     /* find what chain the element is in */
1136     chain = gst_basic_scheduler_find_chain (bsched, element);
1137
1138     /* remove it from its chain */
1139     if (chain != NULL) {
1140       gst_basic_scheduler_chain_remove_element (chain, element);
1141     }
1142
1143     /* remove it from the list of elements */
1144     bsched->elements = g_list_remove (bsched->elements, element);
1145     bsched->num_elements--;
1146
1147     /* unset the scheduler pointer in the element */
1148   }
1149 }
1150
1151 static GstElementStateReturn
1152 gst_basic_scheduler_state_transition (GstScheduler * sched,
1153     GstElement * element, gint transition)
1154 {
1155   GstSchedulerChain *chain;
1156   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1157
1158   /* check if our parent changed state */
1159   if (GST_SCHEDULER_PARENT (sched) == element) {
1160     GST_INFO ("parent \"%s\" changed state", GST_ELEMENT_NAME (element));
1161     if (transition == GST_STATE_PLAYING_TO_PAUSED) {
1162       GST_INFO ("setting scheduler state to stopped");
1163       GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED;
1164     } else if (transition == GST_STATE_PAUSED_TO_PLAYING) {
1165       GST_INFO ("setting scheduler state to running");
1166       GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING;
1167     } else {
1168       GST_INFO ("no interesting state change, doing nothing");
1169     }
1170   } else if (transition == GST_STATE_PLAYING_TO_PAUSED ||
1171       transition == GST_STATE_PAUSED_TO_PLAYING) {
1172     /* find the chain the element is in */
1173     chain = gst_basic_scheduler_find_chain (bsched, element);
1174
1175     /* remove it from the chain */
1176     if (chain) {
1177       if (transition == GST_STATE_PLAYING_TO_PAUSED) {
1178         gst_basic_scheduler_chain_disable_element (chain, element);
1179       } else if (transition == GST_STATE_PAUSED_TO_PLAYING) {
1180         if (!gst_basic_scheduler_chain_enable_element (chain, element)) {
1181           GST_INFO ("could not enable element \"%s\"",
1182               GST_ELEMENT_NAME (element));
1183           return GST_STATE_FAILURE;
1184         }
1185       }
1186     } else {
1187       GST_INFO ("element \"%s\" not found in any chain, no state change",
1188           GST_ELEMENT_NAME (element));
1189     }
1190   }
1191
1192   return GST_STATE_SUCCESS;
1193 }
1194
1195 static gboolean
1196 gst_basic_scheduler_yield (GstScheduler * sched, GstElement * element)
1197 {
1198   if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) {
1199
1200     do_switch_to_main (sched);
1201
1202     /* no need to do a pre_run, the cothread is stopping */
1203   }
1204   return FALSE;
1205 }
1206
1207 static gboolean
1208 gst_basic_scheduler_interrupt (GstScheduler * sched, GstElement * element)
1209 {
1210
1211   GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
1212   do_switch_to_main (sched);
1213
1214   return FALSE;
1215 }
1216
1217 static void
1218 gst_basic_scheduler_error (GstScheduler * sched, GstElement * element)
1219 {
1220   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1221
1222   if (GST_ELEMENT_THREADSTATE (element)) {
1223     GstSchedulerChain *chain;
1224
1225     chain = gst_basic_scheduler_find_chain (bsched, element);
1226     if (chain)
1227       gst_basic_scheduler_chain_disable_element (chain, element);
1228
1229     GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_ERROR;
1230
1231     do_switch_to_main (sched);
1232   }
1233 }
1234
1235 static void
1236 gst_basic_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
1237     GstPad * sinkpad)
1238 {
1239   GstElement *srcelement, *sinkelement;
1240   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1241
1242   srcelement = GST_PAD_PARENT (srcpad);
1243   g_return_if_fail (srcelement != NULL);
1244   sinkelement = GST_PAD_PARENT (sinkpad);
1245   g_return_if_fail (sinkelement != NULL);
1246
1247   GST_INFO ("have pad linked callback on %s:%s to %s:%s",
1248       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1249   GST_DEBUG ("srcpad sched is %p, sinkpad sched is %p",
1250       GST_ELEMENT_SCHEDULER (srcelement), GST_ELEMENT_SCHEDULER (sinkelement));
1251
1252   gst_basic_scheduler_chain_elements (bsched, srcelement, sinkelement);
1253 }
1254
1255 static void
1256 gst_basic_scheduler_pad_unlink (GstScheduler * sched, GstPad * srcpad,
1257     GstPad * sinkpad)
1258 {
1259   GstElement *element1, *element2;
1260   GstSchedulerChain *chain1, *chain2;
1261   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1262
1263   GST_INFO ("unlinking pads %s:%s and %s:%s",
1264       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
1265
1266   /* we need to have the parent elements of each pad */
1267   element1 = GST_ELEMENT (GST_PAD_PARENT (srcpad));
1268   element2 = GST_ELEMENT (GST_PAD_PARENT (sinkpad));
1269
1270   /* first task is to remove the old chain they belonged to.
1271    * this can be accomplished by taking either of the elements,
1272    * since they are guaranteed to be in the same chain
1273    * FIXME is it potentially better to make an attempt at splitting cleaner??
1274    */
1275   chain1 = gst_basic_scheduler_find_chain (bsched, element1);
1276   chain2 = gst_basic_scheduler_find_chain (bsched, element2);
1277
1278 /* FIXME: The old code still works in most cases, but does not deal with
1279  * the problem of screwed up sched chains in some autoplugging cases.
1280  * The new code has an infinite recursion bug during pipeline shutdown,
1281  * which must be fixed before it can be enabled again.
1282  */
1283 #if 1
1284   if (chain1 != chain2) {
1285     /* elements not in the same chain don't need to be separated */
1286     GST_INFO ("elements not in the same chain");
1287     return;
1288   }
1289
1290   if (chain1) {
1291     GST_INFO ("destroying chain");
1292     gst_basic_scheduler_chain_destroy (chain1);
1293
1294     /* now create a new chain to hold element1 and build it from scratch */
1295     chain1 = gst_basic_scheduler_chain_new (bsched);
1296     gst_basic_scheduler_chain_recursive_add (chain1, element1, FALSE);
1297   }
1298
1299   /* check the other element to see if it landed in the newly created chain */
1300   if (gst_basic_scheduler_find_chain (bsched, element2) == NULL) {
1301     /* if not in chain, create chain and build from scratch */
1302     chain2 = gst_basic_scheduler_chain_new (bsched);
1303     gst_basic_scheduler_chain_recursive_add (chain2, element2, FALSE);
1304   }
1305 #else
1306
1307   /* if they're both in the same chain, move second set of elements to a new chain */
1308   if (chain1 && (chain1 == chain2)) {
1309     GST_INFO ("creating new chain for second element and peers");
1310     chain2 = gst_basic_scheduler_chain_new (bsched);
1311     gst_basic_scheduler_chain_recursive_add (chain2, element2, TRUE);
1312   }
1313 #endif
1314 }
1315
1316 static GstData *
1317 gst_basic_scheduler_pad_select (GstScheduler * sched, GstPad ** selected,
1318     GstPad ** padlist)
1319 {
1320   GstData *data = NULL;
1321   gint i = 0;
1322
1323   GST_INFO ("performing select");
1324
1325   while (padlist[i]) {
1326     GstPad *pad = padlist[i];
1327
1328     GST_RPAD_CHAINHANDLER (pad) =
1329         GST_DEBUG_FUNCPTR (gst_basic_scheduler_select_proxy);
1330   }
1331
1332   do_element_switch (GST_PAD_PARENT (GST_PAD_PEER (padlist[0])));
1333
1334   i = 0;
1335   while (padlist[i]) {
1336     GstPad *pad = padlist[i];
1337
1338     if (GST_RPAD_BUFPEN (pad)) {
1339       *selected = pad;
1340       data = GST_RPAD_BUFPEN (pad);
1341       GST_RPAD_BUFPEN (pad) = NULL;
1342     }
1343
1344     GST_RPAD_CHAINHANDLER (pad) =
1345         GST_DEBUG_FUNCPTR (gst_basic_scheduler_chainhandler_proxy);
1346   }
1347
1348   g_assert (data != NULL);
1349   return data;
1350 }
1351
1352 static GstSchedulerState
1353 gst_basic_scheduler_iterate (GstScheduler * sched)
1354 {
1355   GList *chains;
1356   GstSchedulerChain *chain;
1357   GstElement *entry;
1358   GList *elements;
1359   gint scheduled = 0;
1360   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1361
1362   GST_CAT_LOG_OBJECT (debug_dataflow, sched,
1363       "starting iteration in bin %s", GST_ELEMENT_NAME (sched->parent));
1364
1365   /* clear the changes flag */
1366   GST_FLAG_UNSET (bsched, GST_BASIC_SCHEDULER_CHANGE);
1367
1368   /* step through all the chains */
1369   chains = bsched->chains;
1370
1371   if (chains == NULL)
1372     return GST_SCHEDULER_STATE_STOPPED;
1373
1374   while (chains) {
1375     chain = (GstSchedulerChain *) (chains->data);
1376     chains = g_list_next (chains);
1377
1378     /* all we really have to do is switch to the first child            */
1379     /* FIXME this should be lots more intelligent about where to start  */
1380     GST_CAT_DEBUG (debug_dataflow,
1381         "starting iteration via cothreads using %s scheduler", _SCHEDULER_NAME);
1382
1383     if (chain->elements) {
1384       entry = NULL;             /*MattH ADDED? */
1385       GST_DEBUG ("there are %d elements in this chain", chain->num_elements);
1386       elements = chain->elements;
1387       while (elements) {
1388         entry = GST_ELEMENT (elements->data);
1389         elements = g_list_next (elements);
1390         if (GST_FLAG_IS_SET (entry, GST_ELEMENT_DECOUPLED)) {
1391           GST_DEBUG ("entry \"%s\" is DECOUPLED, skipping",
1392               GST_ELEMENT_NAME (entry));
1393           entry = NULL;
1394         } else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_INFINITE_LOOP)) {
1395           GST_DEBUG ("entry \"%s\" is not valid, skipping",
1396               GST_ELEMENT_NAME (entry));
1397           entry = NULL;
1398         } else
1399           break;
1400       }
1401       if (entry) {
1402         GstSchedulerState state;
1403
1404         GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
1405
1406         GST_CAT_DEBUG (debug_dataflow,
1407             "set COTHREAD_STOPPING flag on \"%s\"(@%p)",
1408             GST_ELEMENT_NAME (entry), entry);
1409         if (GST_ELEMENT_THREADSTATE (entry)) {
1410
1411           do_switch_from_main (entry);
1412
1413           state = GST_SCHEDULER_STATE (sched);
1414           /* if something changed, return - go on else */
1415           if (GST_FLAG_IS_SET (bsched, GST_BASIC_SCHEDULER_CHANGE) &&
1416               state != GST_SCHEDULER_STATE_ERROR)
1417             return GST_SCHEDULER_STATE_RUNNING;
1418         } else {
1419           GST_CAT_DEBUG (debug_dataflow,
1420               "cothread switch not possible, element has no threadstate");
1421           return GST_SCHEDULER_STATE_ERROR;
1422         }
1423
1424         /* following is a check to see if the chain was interrupted due to a
1425          * top-half state_change().  (i.e., if there's a pending state.)
1426          *
1427          * if it was, return to gstthread.c::gst_thread_main_loop() to
1428          * execute the state change.
1429          */
1430         GST_CAT_DEBUG (debug_dataflow, "cothread switch ended or interrupted");
1431
1432         if (state != GST_SCHEDULER_STATE_RUNNING) {
1433           GST_CAT_INFO (debug_dataflow, "scheduler is not running, in state %d",
1434               state);
1435           return state;
1436         }
1437
1438         scheduled++;
1439       } else {
1440         GST_CAT_INFO (debug_dataflow,
1441             "no entry in this chain, trying the next one");
1442       }
1443     } else {
1444       GST_CAT_INFO (debug_dataflow,
1445           "no enabled elements in this chain, trying the next one");
1446     }
1447   }
1448
1449   GST_CAT_LOG_OBJECT (debug_dataflow, sched, "leaving (%s)",
1450       GST_ELEMENT_NAME (sched->parent));
1451   if (scheduled == 0) {
1452     GST_CAT_INFO (debug_dataflow, "nothing was scheduled, return STOPPED");
1453     return GST_SCHEDULER_STATE_STOPPED;
1454   } else {
1455     GST_CAT_INFO (debug_dataflow, "scheduler still running, return RUNNING");
1456     return GST_SCHEDULER_STATE_RUNNING;
1457   }
1458 }
1459
1460
1461 static void
1462 gst_basic_scheduler_show (GstScheduler * sched)
1463 {
1464   GList *chains, *elements;
1465   GstElement *element;
1466   GstSchedulerChain *chain;
1467   GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
1468
1469   if (sched == NULL) {
1470     g_print ("scheduler doesn't exist for this element\n");
1471     return;
1472   }
1473
1474   g_return_if_fail (GST_IS_SCHEDULER (sched));
1475
1476   g_print ("SCHEDULER DUMP FOR MANAGING BIN \"%s\"\n",
1477       GST_ELEMENT_NAME (sched->parent));
1478
1479   g_print ("scheduler has %d elements in it: ", bsched->num_elements);
1480   elements = bsched->elements;
1481   while (elements) {
1482     element = GST_ELEMENT (elements->data);
1483     elements = g_list_next (elements);
1484
1485     g_print ("%s, ", GST_ELEMENT_NAME (element));
1486   }
1487   g_print ("\n");
1488
1489   g_print ("scheduler has %d chains in it\n", bsched->num_chains);
1490   chains = bsched->chains;
1491   while (chains) {
1492     chain = (GstSchedulerChain *) (chains->data);
1493     chains = g_list_next (chains);
1494
1495     g_print ("%p: ", chain);
1496
1497     elements = chain->disabled;
1498     while (elements) {
1499       element = GST_ELEMENT (elements->data);
1500       elements = g_list_next (elements);
1501
1502       g_print ("!%s, ", GST_ELEMENT_NAME (element));
1503     }
1504
1505     elements = chain->elements;
1506     while (elements) {
1507       element = GST_ELEMENT (elements->data);
1508       elements = g_list_next (elements);
1509
1510       g_print ("%s, ", GST_ELEMENT_NAME (element));
1511     }
1512     g_print ("\n");
1513   }
1514 }