Removed the old eos and qos functions.
[platform/upstream/gstreamer.git] / gst / gstscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 //#define GST_DEBUG_ENABLED
24 #include "gst_private.h"
25
26 #include "gstscheduler.h"
27
28
29 static int
30 gst_schedule_loopfunc_wrapper (int argc,char *argv[])
31 {
32   GstElement *element = GST_ELEMENT (argv);
33   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
34
35   GST_DEBUG_ENTER("(%d,'%s')",argc,name);
36
37   do {
38     GST_DEBUG (GST_CAT_DATAFLOW,"calling loopfunc %s for element %s\n",
39                GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name);
40     (element->loopfunc) (element);
41     GST_DEBUG (GST_CAT_DATAFLOW,"element %s ended loop function\n", name);
42   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
43   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
44
45   GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
46   return 0;
47 }
48
49 static int
50 gst_schedule_chain_wrapper (int argc,char *argv[])
51 {
52   GstElement *element = GST_ELEMENT (argv);
53   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
54
55   GST_DEBUG_ENTER("(\"%s\")",name);
56
57   GST_DEBUG (GST_CAT_DATAFLOW,"stepping through pads\n");
58
59   do {
60     GList *pads = element->pads;
61     
62     while (pads) {
63       GstPad *pad = GST_PAD (pads->data);
64       GstRealPad *realpad;
65
66       pads = g_list_next (pads);
67       if (!GST_IS_REAL_PAD(pad)) 
68         continue;
69       realpad = GST_REAL_PAD(pad);
70       if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK) {
71         GstBuffer *buf;
72
73         GST_DEBUG (GST_CAT_DATAFLOW,"pulling a buffer from %s:%s\n", name, GST_PAD_NAME (pad));
74         buf = gst_pad_pull (pad);
75         GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s\n", name, GST_PAD_NAME (pad));
76         if (buf) 
77           GST_RPAD_CHAINFUNC(realpad) (pad,buf);
78         GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s done\n", name, GST_PAD_NAME (pad));
79       }
80     }
81   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
82   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
83
84   GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
85   return 0;
86 }
87
88 static int
89 gst_schedule_src_wrapper (int argc,char *argv[])
90 {
91   GstElement *element = GST_ELEMENT (argv);
92   GList *pads;
93   GstRealPad *realpad;
94   GstBuffer *buf = NULL;
95   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
96
97   GST_DEBUG_ENTER("(%d,\"%s\")",argc,name);
98
99   do {
100     pads = element->pads;
101     while (pads) {
102       if (!GST_IS_REAL_PAD(pads->data)) continue;
103       realpad = (GstRealPad*)(pads->data);
104       pads = g_list_next(pads);
105       if (GST_RPAD_DIRECTION(realpad) == GST_PAD_SRC) {
106         GST_DEBUG (GST_CAT_DATAFLOW,"calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
107         if (realpad->regiontype != GST_REGION_VOID) {
108           g_return_val_if_fail (GST_RPAD_GETREGIONFUNC(realpad) != NULL, 0);
109 //          if (GST_RPAD_GETREGIONFUNC(realpad) == NULL)
110 //            fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name);
111 //          else
112           buf = (GST_RPAD_GETREGIONFUNC(realpad))((GstPad*)realpad,realpad->regiontype,realpad->offset,realpad->len);
113           realpad->regiontype = GST_REGION_VOID;
114         } else {
115           g_return_val_if_fail (GST_RPAD_GETFUNC(realpad) != NULL, 0);
116 //          if (GST_RPAD_GETFUNC(realpad) == NULL)
117 //            fprintf(stderr,"error, no getfunc in \"%s\"\n", name);
118 //          else
119           buf = GST_RPAD_GETFUNC(realpad) ((GstPad*)realpad);
120         }
121
122         GST_DEBUG (GST_CAT_DATAFLOW,"calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
123         gst_pad_push ((GstPad*)realpad, buf);
124       }
125     }
126   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
127   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
128
129   GST_DEBUG_LEAVE("");
130   return 0;
131 }
132
133 static void
134 gst_schedule_chainhandler_proxy (GstPad *pad, GstBuffer *buf)
135 {
136   GstRealPad *peer = GST_RPAD_PEER(pad);
137
138   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
139   GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer \"%s:%s\"'s pen\n",buf,GST_DEBUG_PAD_NAME(peer));
140
141   // FIXME this should be bounded
142   // loop until the bufferpen is empty so we can fill it up again
143   while (GST_RPAD_BUFPEN(pad) != NULL) {
144     GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n",
145                GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
146     cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
147
148     // we may no longer be the same pad, check.
149     if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
150       GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
151       pad = (GstPad *)GST_RPAD_PEER(peer);
152     }
153   }
154
155   g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
156   // now fill the bufferpen and switch so it can be consumed
157   GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
158   GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
159   cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
160
161   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
162 }
163
164 static void
165 gst_schedule_select_proxy (GstPad *pad, GstBuffer *buf)
166 {
167   GstRealPad *peer = GST_RPAD_PEER(pad);
168
169   g_print ("select proxy (%s:%s)\n",GST_DEBUG_PAD_NAME(pad));
170
171   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
172
173   GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer's pen\n",buf);
174
175   g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
176   // now fill the bufferpen and switch so it can be consumed
177   GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
178   GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
179   g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
180   GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
181   GST_FLAG_UNSET(GST_PAD_PARENT (pad), GST_ELEMENT_COTHREAD_STOPPING);
182   cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
183
184   g_print ("done switching\n");
185   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
186 }
187
188
189 static GstBuffer*
190 gst_schedule_gethandler_proxy (GstPad *pad)
191 {
192   GstBuffer *buf;
193   GstRealPad *peer = GST_RPAD_PEER(pad);
194
195   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
196
197   // FIXME this should be bounded
198   // we will loop switching to the peer until it's filled up the bufferpen
199   while (GST_RPAD_BUFPEN(pad) == NULL) {
200     GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n",
201                GST_ELEMENT_NAME(GST_ELEMENT(GST_PAD_PARENT(pad))),
202                GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
203     cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
204
205     // we may no longer be the same pad, check.
206     if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
207       GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
208       pad = (GstPad *)GST_RPAD_PEER(peer);
209     }
210   }
211   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
212
213   // now grab the buffer from the pen, clear the pen, and return the buffer
214   buf = GST_RPAD_BUFPEN(pad);
215   GST_RPAD_BUFPEN(pad) = NULL;
216   return buf;
217 }
218
219 static GstBuffer*
220 gst_schedule_pullregionfunc_proxy (GstPad *pad,GstRegionType type,guint64 offset,guint64 len)
221 {
222   GstBuffer *buf;
223   GstRealPad *peer = GST_RPAD_PEER(pad);
224
225   GST_DEBUG_ENTER("%s:%s,%d,%lld,%lld",GST_DEBUG_PAD_NAME(pad),type,offset,len);
226
227   // put the region info into the pad
228   GST_RPAD_REGIONTYPE(pad) = type;
229   GST_RPAD_OFFSET(pad) = offset;
230   GST_RPAD_LEN(pad) = len;
231
232   // FIXME this should be bounded
233   // we will loop switching to the peer until it's filled up the bufferpen
234   while (GST_RPAD_BUFPEN(pad) == NULL) {
235     GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n",
236                GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
237     cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
238
239     // we may no longer be the same pad, check.
240     if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
241       GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
242       pad = (GstPad *)GST_RPAD_PEER(peer);
243     }
244   }
245   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
246
247   // now grab the buffer from the pen, clear the pen, and return the buffer
248   buf = GST_RPAD_BUFPEN(pad);
249   GST_RPAD_BUFPEN(pad) = NULL;
250   return buf;
251 }
252
253
254 static void
255 gst_schedule_cothreaded_chain (GstBin *bin, GstScheduleChain *chain) {
256   GList *elements;
257   GstElement *element;
258   cothread_func wrapper_function;
259   GList *pads;
260   GstPad *pad;
261
262   GST_DEBUG (GST_CAT_SCHEDULING,"chain is using COTHREADS\n");
263
264   // first create thread context
265   if (bin->threadcontext == NULL) {
266     GST_DEBUG (GST_CAT_SCHEDULING,"initializing cothread context\n");
267     bin->threadcontext = cothread_init ();
268   }
269
270   // walk through all the chain's elements
271   elements = chain->elements;
272   while (elements) {
273     element = GST_ELEMENT (elements->data);
274     elements = g_list_next (elements);
275
276     // start out without a wrapper function, we select it later
277     wrapper_function = NULL;
278
279     // if the element has a loopfunc...
280     if (element->loopfunc != NULL) {
281       wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_loopfunc_wrapper);
282       GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a loop-based\n",GST_ELEMENT_NAME(element));
283     } else {
284       // otherwise we need to decide what kind of cothread
285       // if it's not DECOUPLED, we decide based on whether it's a source or not
286       if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
287         // if it doesn't have any sinks, it must be a source (duh)
288         if (element->numsinkpads == 0) {
289           wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_src_wrapper);
290           GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a source, using _src_wrapper\n",GST_ELEMENT_NAME(element));
291         } else {
292           wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_chain_wrapper);
293           GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a filter, using _chain_wrapper\n",GST_ELEMENT_NAME(element));
294         }
295       }
296     }
297
298     // now we have to walk through the pads to set up their state
299     pads = gst_element_get_pad_list (element);
300     while (pads) {
301       pad = GST_PAD (pads->data);
302       pads = g_list_next (pads);
303       if (!GST_IS_REAL_PAD(pad)) continue;
304
305       // if the element is DECOUPLED or outside the manager, we have to chain
306       if ((wrapper_function == NULL) ||
307           (GST_RPAD_PEER(pad) &&
308            (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched))
309          ) {
310         // set the chain proxies
311         if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
312           GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
313           GST_RPAD_CHAINHANDLER(pad) = GST_RPAD_CHAINFUNC(pad);
314         } else {
315           GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
316           GST_RPAD_GETHANDLER(pad) = GST_RPAD_GETFUNC(pad);
317           GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
318         }
319
320       // otherwise we really are a cothread
321       } else {
322         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
323           GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
324           GST_RPAD_CHAINHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_schedule_chainhandler_proxy);
325         } else {
326           GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
327           GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_schedule_gethandler_proxy);
328           GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_pullregionfunc_proxy);
329         }
330       }
331     }
332
333     // need to set up the cothread now
334     if (wrapper_function != NULL) {
335       if (element->threadstate == NULL) {
336         // FIXME handle cothread_create returning NULL
337         element->threadstate = cothread_create (bin->threadcontext);
338         GST_DEBUG (GST_CAT_SCHEDULING,"created cothread %p for '%s'\n",element->threadstate,GST_ELEMENT_NAME(element));
339       }
340       cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
341       GST_DEBUG (GST_CAT_SCHEDULING,"set wrapper function for '%s' to &%s\n",GST_ELEMENT_NAME(element),
342             GST_DEBUG_FUNCPTR_NAME(wrapper_function));
343     }
344   }
345 }
346
347 G_GNUC_UNUSED static void
348 gst_schedule_chained_chain (GstBin *bin, _GstBinChain *chain) {
349   GList *elements;
350   GstElement *element;
351   GList *pads;
352   GstPad *pad;
353
354   GST_DEBUG (GST_CAT_SCHEDULING,"chain entered\n");
355   // walk through all the elements
356   elements = chain->elements;
357   while (elements) {
358     element = GST_ELEMENT (elements->data);
359     elements = g_list_next (elements);
360
361     // walk through all the pads
362     pads = gst_element_get_pad_list (element);
363     while (pads) {
364       pad = GST_PAD (pads->data);
365       pads = g_list_next (pads);
366       if (!GST_IS_REAL_PAD(pad)) continue;
367
368       if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
369         GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
370         GST_RPAD_CHAINHANDLER(pad) = GST_RPAD_CHAINFUNC(pad);
371       } else {
372         GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
373         GST_RPAD_GETHANDLER(pad) = GST_RPAD_GETFUNC(pad);
374         GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
375       }
376     }
377   }
378 }
379
380 /* depracated!! */
381 static void
382 gst_bin_schedule_cleanup (GstBin *bin)
383 {
384   GList *chains;
385   _GstBinChain *chain;
386
387   chains = bin->chains;
388   while (chains) {
389     chain = (_GstBinChain *)(chains->data);
390     chains = g_list_next(chains);
391
392 //    g_list_free(chain->disabled);
393     g_list_free(chain->elements);
394     g_list_free(chain->entries);
395
396     g_free(chain);
397   }
398   g_list_free(bin->chains);
399
400   bin->chains = NULL;
401 }
402
403 static void
404 gst_scheduler_handle_eos (GstElement *element, _GstBinChain *chain)
405 {
406   GST_DEBUG (GST_CAT_SCHEDULING,"chain removed from scheduler, EOS from element \"%s\"\n", GST_ELEMENT_NAME (element));
407   chain->need_scheduling = FALSE;
408 }
409
410 /*
411 void gst_bin_schedule_func(GstBin *bin) {
412   GList *elements;
413   GstElement *element;
414   GSList *pending = NULL;
415   GList *pads;
416   GstPad *pad;
417   GstElement *peerparent;
418   GList *chains;
419   GstScheduleChain *chain;
420
421   GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME (GST_ELEMENT (bin)));
422
423   gst_bin_schedule_cleanup(bin);
424
425   // next we have to find all the separate scheduling chains
426   GST_DEBUG (GST_CAT_SCHEDULING,"attempting to find scheduling chains...\n");
427   // first make a copy of the managed_elements we can mess with
428   elements = g_list_copy (bin->managed_elements);
429   // we have to repeat until the list is empty to get all chains
430   while (elements) {
431     element = GST_ELEMENT (elements->data);
432
433     // if this is a DECOUPLED element
434     if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
435       // skip this element entirely
436       GST_DEBUG (GST_CAT_SCHEDULING,"skipping '%s' because it's decoupled\n",GST_ELEMENT_NAME(element));
437       elements = g_list_next (elements);
438       continue;
439     }
440
441     GST_DEBUG (GST_CAT_SCHEDULING,"starting with element '%s'\n",GST_ELEMENT_NAME(element));
442
443     // prime the pending list with the first element off the top
444     pending = g_slist_prepend (NULL, element);
445     // and remove that one from the main list
446     elements = g_list_remove (elements, element);
447
448     // create a chain structure
449     chain = g_new0 (_GstBinChain, 1);
450     chain->need_scheduling = TRUE;
451
452     // for each pending element, walk the pipeline
453     do {
454       // retrieve the top of the stack and pop it
455       element = GST_ELEMENT (pending->data);
456       pending = g_slist_remove (pending, element);
457
458       // add ourselves to the chain's list of elements
459       GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to chain\n",GST_ELEMENT_NAME(element));
460       chain->elements = g_list_prepend (chain->elements, element);
461       chain->num_elements++;
462       gtk_signal_connect (G_OBJECT (element), "eos", gst_scheduler_handle_eos, chain);
463       // set the cothreads flag as appropriate
464       if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD))
465         chain->need_cothreads = TRUE;
466       if (bin->use_cothreads == TRUE)
467         chain->need_cothreads = TRUE;
468
469       // if we're managed by the current bin, and we're not decoupled,
470       // go find all the peers and add them to the list of elements to check
471       if ((element->manager == GST_ELEMENT(bin)) &&
472           !GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
473         // remove ourselves from the outer list of all managed elements
474 //        GST_DEBUG (GST_CAT_SCHEDULING,"removing '%s' from list of possible elements\n",GST_ELEMENT_NAME(element));
475         elements = g_list_remove (elements, element);
476
477         // if this element is a source, add it as an entry
478         if (element->numsinkpads == 0) {
479           chain->entries = g_list_prepend (chain->entries, element);
480           GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as SRC entry into the chain\n",GST_ELEMENT_NAME(element));
481         }
482
483         // now we have to walk the pads to find peers
484         pads = gst_element_get_pad_list (element);
485         while (pads) {
486           pad = GST_PAD (pads->data);
487           pads = g_list_next (pads);
488           if (!GST_IS_REAL_PAD(pad)) continue;
489           GST_DEBUG (GST_CAT_SCHEDULING,"have pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
490
491           if (GST_RPAD_PEER(pad) == NULL) continue;
492           if (GST_RPAD_PEER(pad) == NULL) GST_ERROR(pad,"peer is null!");
493           g_assert(GST_RPAD_PEER(pad) != NULL);
494           g_assert(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))) != NULL);
495
496           peerparent = GST_ELEMENT(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))));
497
498           GST_DEBUG (GST_CAT_SCHEDULING,"peer pad %p\n", GST_RPAD_PEER(pad));
499           // only bother with if the pad's peer's parent is this bin or it's DECOUPLED
500           // only add it if it's in the list of un-visited elements still
501           if ((g_list_find (elements, peerparent) != NULL) ||
502               GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED)) {
503             // add the peer element to the pending list
504             GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to list of pending elements\n",
505                        GST_ELEMENT_NAME(peerparent));
506             pending = g_slist_prepend (pending, peerparent);
507
508             // if this is a sink pad, then the element on the other side is an entry
509             if ((GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) &&
510                 (GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED))) {
511               chain->entries = g_list_prepend (chain->entries, peerparent);
512               gtk_signal_connect (G_OBJECT (peerparent), "eos", gst_scheduler_handle_eos, chain);
513               GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as DECOUPLED entry into the chain\n",GST_ELEMENT_NAME(peerparent));
514             }
515           } else
516             GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' has already been dealt with\n",GST_ELEMENT_NAME(peerparent));
517         }
518       }
519     } while (pending);
520
521     // add the chain to the bin
522     GST_DEBUG (GST_CAT_SCHEDULING,"have chain with %d elements: ",chain->num_elements);
523     { GList *elements = chain->elements;
524       while (elements) {
525         element = GST_ELEMENT (elements->data);
526         elements = g_list_next(elements);
527         GST_DEBUG_NOPREFIX(GST_CAT_SCHEDULING,"%s, ",GST_ELEMENT_NAME(element));
528       }
529     }
530     GST_DEBUG_NOPREFIX(GST_CAT_DATAFLOW,"\n");
531     bin->chains = g_list_prepend (bin->chains, chain);
532     bin->num_chains++;
533   }
534   // free up the list in case it's full of DECOUPLED elements
535   g_list_free (elements);
536
537   GST_DEBUG (GST_CAT_SCHEDULING,"\nwe have %d chains to schedule\n",bin->num_chains);
538
539   // now we have to go through all the chains and schedule them
540   chains = bin->chains;
541   while (chains) {
542     chain = (GstScheduleChain *)(chains->data);
543     chains = g_list_next (chains);
544
545     // schedule as appropriate
546     if (chain->need_cothreads) {
547       gst_schedule_cothreaded_chain (bin,chain);
548     } else {
549       gst_schedule_chained_chain (bin,chain);
550     }
551   }
552
553   GST_DEBUG_LEAVE("(\"%s\")",GST_ELEMENT_NAME(GST_ELEMENT(bin)));
554 }
555 */
556
557
558 /*
559         // ***** check for possible connections outside
560         // get the pad's peer
561         peer = gst_pad_get_peer (pad);
562         // FIXME this should be an error condition, if not disabled
563         if (!peer) break;
564         // get the parent of the peer of the pad
565         outside = GST_ELEMENT (gst_pad_get_parent (peer));
566         // FIXME this should *really* be an error condition
567         if (!outside) break;
568         // if it's a source or connection and it's not ours...
569         if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
570             (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
571           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
572             GST_DEBUG (0,"dealing with outside source element %s\n",GST_ELEMENT_NAME(outside));
573 //            GST_DEBUG (0,"PUNT: copying gethandler ptr from %s:%s to %s:%s (@ %p)\n",
574 //GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->gethandler);
575 //            pad->gethandler = pad->peer->gethandler;
576 //            GST_DEBUG (0,"PUNT: setting chainhandler proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer));
577 //            pad->peer->chainhandler = GST_DEBUG_FUNCPTR(gst_bin_chainhandler_fake_proxy);
578             GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_bin_gethandler_proxy);
579             GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
580           }
581         } else {
582 */
583
584
585
586
587
588 /*
589       } else if (GST_IS_SRC (element)) {
590         GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
591         bin->entries = g_list_prepend (bin->entries,element);
592         bin->num_entries++;
593         cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
594       }
595
596       pads = gst_element_get_pad_list (element);
597       while (pads) {
598         pad = GST_PAD(pads->data);
599
600         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
601           GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
602           // set the proxy functions
603           pad->chainhandler = GST_DEBUG_FUNCPTR(gst_bin_chainhandler_proxy);
604           GST_DEBUG (0,"chainhandler %p = gst_bin_chainhandler_proxy %p\n",&pad->chainhandler,gst_bin_chainhandler_proxy);
605         } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
606           GST_DEBUG (0,"setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
607           // set the proxy functions
608           GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_bin_gethandler_proxy);
609           GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
610           GST_DEBUG (0,"pad->gethandler(@%p) = gst_bin_gethandler_proxy(@%p)\n",
611                 &pad->gethandler,gst_bin_gethandler_proxy);
612           pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
613         }
614         pads = g_list_next (pads);
615       }
616       elements = g_list_next (elements);
617
618       // if there are no entries, we have to pick one at random
619       if (bin->num_entries == 0)
620         bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
621     }
622   } else {
623     GST_DEBUG (0,"don't need cothreads, looking for entry points\n");
624     // we have to find which elements will drive an iteration
625     elements = bin->children;
626     while (elements) {
627       element = GST_ELEMENT (elements->data);
628       GST_DEBUG (0,"found element \"%s\"\n", GST_ELEMENT_NAME (element));
629       if (GST_IS_BIN (element)) {
630         gst_bin_create_plan (GST_BIN (element));
631       }
632       if (GST_IS_SRC (element)) {
633         GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
634         bin->entries = g_list_prepend (bin->entries, element);
635         bin->num_entries++;
636       }
637
638       // go through all the pads, set pointers, and check for connections
639       pads = gst_element_get_pad_list (element);
640       while (pads) {
641         pad = GST_PAD (pads->data);
642
643         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
644           GST_DEBUG (0,"found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
645
646           // copy the peer's chain function, easy enough
647           GST_DEBUG (0,"copying peer's chainfunc to %s:%s's chainhandler\n",GST_DEBUG_PAD_NAME(pad));
648           GST_RPAD_CHAINHANDLER(pad) = GST_DEBUG_FUNCPTR(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)));
649
650           // need to walk through and check for outside connections
651 //FIXME need to do this for all pads
652           // get the pad's peer
653           peer = GST_RPAD_PEER(pad);
654           if (!peer) {
655             GST_DEBUG (0,"found SINK pad %s has no peer\n", GST_ELEMENT_NAME (pad));
656             break;
657           }
658           // get the parent of the peer of the pad
659           outside = GST_ELEMENT (GST_PAD_PARENT(peer));
660           if (!outside) break;
661           // if it's a connection and it's not ours...
662           if (GST_IS_CONNECTION (outside) &&
663                (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
664             gst_info("gstbin: element \"%s\" is the external source Connection "
665                                     "for internal element \"%s\"\n",
666                           GST_ELEMENT_NAME (GST_ELEMENT (outside)),
667                           GST_ELEMENT_NAME (GST_ELEMENT (element)));
668             bin->entries = g_list_prepend (bin->entries, outside);
669             bin->num_entries++;
670           }
671         }
672         else {
673           GST_DEBUG (0,"found pad %s\n", GST_ELEMENT_NAME (pad));
674         }
675         pads = g_list_next (pads);
676
677       }
678       elements = g_list_next (elements);
679     }
680 */
681
682
683
684
685 /*
686   // If cothreads are needed, we need to not only find elements but
687   // set up cothread states and various proxy functions.
688   if (bin->need_cothreads) {
689     GST_DEBUG (0,"bin is using cothreads\n");
690
691     // first create thread context
692     if (bin->threadcontext == NULL) {
693       GST_DEBUG (0,"initializing cothread context\n");
694       bin->threadcontext = cothread_init ();
695     }
696
697     // walk through all the children
698     elements = bin->managed_elements;
699     while (elements) {
700       element = GST_ELEMENT (elements->data);
701       elements = g_list_next (elements);
702
703       // start out with a NULL warpper function, we'll set it if we want a cothread
704       wrapper_function = NULL;
705
706       // have to decide if we need to or can use a cothreads, and if so which wrapper
707       // first of all, if there's a loopfunc, the decision's already made
708       if (element->loopfunc != NULL) {
709         wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
710         GST_DEBUG (0,"element %s is a loopfunc, must use a cothread\n",GST_ELEMENT_NAME (element));
711       } else {
712         // otherwise we need to decide if it needs a cothread
713         // if it's complex, or cothreads are preferred and it's *not* decoupled, cothread it
714         if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) ||
715             (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) &&
716              !GST_FLAG_IS_SET (element,GST_ELEMENT_DECOUPLED))) {
717           // base it on whether we're going to loop through source or sink pads
718           if (element->numsinkpads == 0)
719             wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
720           else
721             wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
722         }
723       }
724
725       // walk through the all the pads for this element, setting proxy functions
726       // the selection of proxy functions depends on whether we're in a cothread or not
727       pads = gst_element_get_pad_list (element);
728       while (pads) {
729         pad = GST_PAD (pads->data);
730         pads = g_list_next (pads);
731
732         // check to see if someone else gets to set up the element
733         peer_manager = GST_ELEMENT((pad)->peer->parent)->manager;
734         if (peer_manager != GST_ELEMENT(bin)) {
735           GST_DEBUG (0,"WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad));
736         }
737
738         // if the wrapper_function is set, we need to use the proxy functions
739         if (wrapper_function != NULL) {
740           // set up proxy functions
741           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
742             GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
743             pad->chainhandler = GST_DEBUG_FUNCPTR(gst_bin_chainhandler_proxy);
744           } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
745             GST_DEBUG (0,"setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
746             GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_bin_gethandler_proxy);
747             GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
748           }
749         } else {
750           // otherwise we need to set up for 'traditional' chaining
751           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
752             // we can just copy the chain function, since it shares the prototype
753             GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",
754                   GST_DEBUG_PAD_NAME(pad));
755             pad->chainhandler = pad->chainfunc;
756           } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
757             // we can just copy the get function, since it shares the prototype
758             GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",
759                   GST_DEBUG_PAD_NAME(pad));
760             pad->gethandler = pad->getfunc;
761           }
762         }
763       }
764
765       // if a loopfunc has been specified, create and set up a cothread
766       if (wrapper_function != NULL) {
767         if (element->threadstate == NULL) {
768           element->threadstate = cothread_create (bin->threadcontext);
769           GST_DEBUG (0,"created cothread %p (@%p) for \"%s\"\n",element->threadstate,
770                 &element->threadstate,GST_ELEMENT_NAME (element));
771         }
772         cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
773         GST_DEBUG (0,"set wrapper function for \"%s\" to &%s\n",GST_ELEMENT_NAME (element),
774               GST_DEBUG_FUNCPTR_NAME(wrapper_function));
775       }
776
777 //      // HACK: if the element isn't decoupled, it's an entry
778 //      if (!GST_FLAG_IS_SET(element,GST_ELEMENT_DECOUPLED))
779 //        bin->entries = g_list_append(bin->entries, element);
780     }
781
782   // otherwise, cothreads are not needed
783   } else {
784     GST_DEBUG (0,"bin is chained, no cothreads needed\n");
785
786     elements = bin->managed_elements;
787     while (elements) {
788       element = GST_ELEMENT (elements->data);
789       elements = g_list_next (elements);
790
791       pads = gst_element_get_pad_list (element);
792       while (pads) {
793         pad = GST_PAD (pads->data);
794         pads = g_list_next (pads);
795
796         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
797           GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
798           pad->chainhandler = pad->chainfunc;
799         } else {
800           GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
801           pad->gethandler = pad->getfunc;
802         }
803       }
804     }
805   }
806 */
807
808 static void 
809 gst_schedule_lock_element (GstSchedule *sched,GstElement *element)
810 {
811   if (element->threadstate)
812     cothread_lock(element->threadstate);
813 }
814
815 static void
816 gst_schedule_unlock_element (GstSchedule *sched,GstElement *element)
817 {
818   if (element->threadstate)
819     cothread_unlock(element->threadstate);
820 }
821
822
823 /*************** INCREMENTAL SCHEDULING CODE STARTS HERE ***************/
824
825
826 static void     gst_schedule_class_init (GstScheduleClass *klass);
827 static void     gst_schedule_init       (GstSchedule *schedule);
828
829 static GstObjectClass *parent_class = NULL;
830
831 GType gst_schedule_get_type(void) {
832   static GType schedule_type = 0;
833
834   if (!schedule_type) {
835     static const GTypeInfo schedule_info = {
836       sizeof(GstScheduleClass),
837       NULL,
838       NULL,
839       (GClassInitFunc)gst_schedule_class_init,
840       NULL,
841       NULL,
842       sizeof(GstSchedule),
843       0,
844       (GInstanceInitFunc)gst_schedule_init,
845     };
846     schedule_type = g_type_register_static(GST_TYPE_OBJECT, "GstSchedule", &schedule_info, 0);
847   }
848   return schedule_type;
849 }
850
851 static void
852 gst_schedule_class_init (GstScheduleClass *klass)
853 {
854   parent_class = g_type_class_ref(GST_TYPE_OBJECT);
855 }
856
857 static void
858 gst_schedule_init (GstSchedule *schedule)
859 {
860   schedule->add_element = GST_DEBUG_FUNCPTR(gst_schedule_add_element);
861   schedule->remove_element = GST_DEBUG_FUNCPTR(gst_schedule_remove_element);
862   schedule->enable_element = GST_DEBUG_FUNCPTR(gst_schedule_enable_element);
863   schedule->disable_element = GST_DEBUG_FUNCPTR(gst_schedule_disable_element);
864   schedule->lock_element = GST_DEBUG_FUNCPTR(gst_schedule_lock_element);
865   schedule->unlock_element = GST_DEBUG_FUNCPTR(gst_schedule_unlock_element);
866   schedule->pad_connect = GST_DEBUG_FUNCPTR(gst_schedule_pad_connect);
867   schedule->pad_disconnect = GST_DEBUG_FUNCPTR(gst_schedule_pad_disconnect);
868   schedule->pad_select = GST_DEBUG_FUNCPTR(gst_schedule_pad_select);
869   schedule->iterate = GST_DEBUG_FUNCPTR(gst_schedule_iterate);
870 }
871
872 GstSchedule*
873 gst_schedule_new(GstElement *parent)
874 {
875   GstSchedule *sched = GST_SCHEDULE (g_object_new(GST_TYPE_SCHEDULE,NULL));
876
877   sched->parent = parent;
878
879   return sched;
880 }
881
882
883 /* this function will look at a pad and determine if the peer parent is
884  * a possible candidate for connecting up in the same chain. */
885 /* DEPRACATED !!!!
886 GstElement *gst_schedule_check_pad (GstSchedule *sched, GstPad *pad) {
887   GstRealPad *peer;
888   GstElement *element, *peerelement;
889
890   GST_INFO (GST_CAT_SCHEDULING, "checking pad %s:%s for peer in scheduler",
891             GST_DEBUG_PAD_NAME(pad));
892
893   element = GST_ELEMENT(GST_PAD_PARENT(peer));
894   GST_DEBUG(GST_CAT_SCHEDULING, "element is \"%s\"\n",GST_ELEMENT_NAME(element));
895
896   peer = GST_PAD_PEER (pad);
897   if (peer == NULL) return NULL;
898   peerelement = GST_ELEMENT(GST_PAD_PARENT (peer));
899   if (peerelement == NULL) return NULL;
900   GST_DEBUG(GST_CAT_SCHEDULING, "peer element is \"%s\"\n",GST_ELEMENT_NAME(peerelement));
901
902   // now check to see if it's in the same schedule
903   if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
904     GST_DEBUG(GST_CAT_SCHEDULING, "peer is in same schedule\n");
905     return peerelement;
906   }
907
908   // otherwise it's not a candidate
909   return NULL;
910 }
911 */
912
913 GstScheduleChain *
914 gst_schedule_chain_new (GstSchedule *sched)
915 {
916   GstScheduleChain *chain = g_new (GstScheduleChain, 1);
917
918   // initialize the chain with sane values
919   chain->sched = sched;
920   chain->disabled = NULL;
921   chain->elements = NULL;
922   chain->num_elements = 0;
923   chain->entry = NULL;
924   chain->cothreaded_elements = 0;
925   chain->schedule = FALSE;
926
927   // add the chain to the schedules' list of chains
928   sched->chains = g_list_prepend (sched->chains, chain);
929   sched->num_chains++;
930
931   GST_INFO (GST_CAT_SCHEDULING, "created new chain %p, now are %d chains in sched %p",
932             chain,sched->num_chains,sched);
933
934   return chain;
935 }
936
937 void
938 gst_schedule_chain_destroy (GstScheduleChain *chain)
939 {
940   GstSchedule *sched = chain->sched;
941
942   // remove the chain from the schedules' list of chains
943   chain->sched->chains = g_list_remove (chain->sched->chains, chain);
944   chain->sched->num_chains--;
945
946   // destroy the chain
947   g_list_free (chain->disabled);        // should be empty...
948   g_list_free (chain->elements);        // ditto
949   g_free (chain);
950
951   GST_INFO (GST_CAT_SCHEDULING, "destroyed chain %p, now are %d chains in sched %p",chain,sched->num_chains,sched);
952 }
953
954 void
955 gst_schedule_chain_add_element (GstScheduleChain *chain, GstElement *element)
956 {
957   GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to chain %p", GST_ELEMENT_NAME (element),chain);
958
959   // set the sched pointer for the element
960   element->sched = chain->sched;
961
962   // add the element to the list of 'disabled' elements
963   chain->disabled = g_list_prepend (chain->disabled, element);
964   chain->num_elements++;
965 }
966
967 void
968 gst_schedule_chain_enable_element (GstScheduleChain *chain, GstElement *element)
969 {
970   GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
971
972   // remove from disabled list
973   chain->disabled = g_list_remove (chain->disabled, element);
974
975   // add to elements list
976   chain->elements = g_list_prepend (chain->elements, element);
977
978   // reschedule the chain
979   gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
980 }
981
982 void
983 gst_schedule_chain_disable_element (GstScheduleChain *chain, GstElement *element)
984 {
985   GST_INFO (GST_CAT_SCHEDULING, "disabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
986
987   // remove from elements list
988   chain->elements = g_list_remove (chain->elements, element);
989
990   // add to disabled list
991   chain->disabled = g_list_prepend (chain->disabled, element);
992
993   // reschedule the chain
994 // FIXME this should be done only if manager state != NULL
995 //  gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
996 }
997
998 void
999 gst_schedule_chain_remove_element (GstScheduleChain *chain, GstElement *element)
1000 {
1001   GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from chain %p", GST_ELEMENT_NAME (element),chain);
1002
1003   // if it's active, deactivate it
1004   if (g_list_find (chain->elements, element)) {
1005     gst_schedule_chain_disable_element (chain, element);
1006   }
1007
1008   // remove the element from the list of elements
1009   chain->disabled = g_list_remove (chain->disabled, element);
1010   chain->num_elements--;
1011
1012   // if there are no more elements in the chain, destroy the chain
1013   if (chain->num_elements == 0)
1014     gst_schedule_chain_destroy(chain);
1015
1016   // unset the sched pointer for the element
1017   element->sched = NULL;
1018 }
1019
1020 void
1021 gst_schedule_chain_elements (GstSchedule *sched, GstElement *element1, GstElement *element2)
1022 {
1023   GList *chains;
1024   GstScheduleChain *chain;
1025   GstScheduleChain *chain1 = NULL, *chain2 = NULL;
1026   GstElement *element;
1027
1028   // first find the chains that hold the two 
1029   chains = sched->chains;
1030   while (chains) {
1031     chain = (GstScheduleChain *)(chains->data);
1032     chains = g_list_next(chains);
1033
1034     if (g_list_find (chain->disabled,element1))
1035       chain1 = chain;
1036     else if (g_list_find (chain->elements,element1))
1037       chain1 = chain;
1038
1039     if (g_list_find (chain->disabled,element2))
1040       chain2 = chain;
1041     else if (g_list_find (chain->elements,element2))
1042       chain2 = chain;
1043   }
1044
1045   // first check to see if they're in the same chain, we're done if that's the case
1046   if ((chain1 != NULL) && (chain1 == chain2)) {
1047     GST_INFO (GST_CAT_SCHEDULING, "elements are already in the same chain");
1048     return;
1049   }
1050
1051   // now, if neither element has a chain, create one
1052   if ((chain1 == NULL) && (chain2 == NULL)) {
1053     GST_INFO (GST_CAT_SCHEDULING, "creating new chain to hold two new elements");
1054     chain = gst_schedule_chain_new (sched);
1055     gst_schedule_chain_add_element (chain, element1);
1056     gst_schedule_chain_add_element (chain, element2);
1057     // FIXME chain changed here
1058 //    gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1059
1060   // otherwise if both have chains already, join them
1061   } else if ((chain1 != NULL) && (chain2 != NULL)) {
1062     GST_INFO (GST_CAT_SCHEDULING, "merging chain %p into chain %p",chain2,chain1);
1063     // take the contents of chain2 and merge them into chain1
1064     chain1->disabled = g_list_concat (chain1->disabled, g_list_copy(chain2->disabled));
1065     chain1->elements = g_list_concat (chain1->elements, g_list_copy(chain2->elements));
1066     chain1->num_elements += chain2->num_elements;
1067     // FIXME chain changed here
1068 //    gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1069
1070     gst_schedule_chain_destroy(chain2);
1071
1072   // otherwise one has a chain already, the other doesn't
1073   } else {
1074     // pick out which one has the chain, and which doesn't
1075     if (chain1 != NULL) chain = chain1, element = element2;
1076     else chain = chain2, element = element1;
1077
1078     GST_INFO (GST_CAT_SCHEDULING, "adding element to existing chain");
1079     gst_schedule_chain_add_element (chain, element);
1080     // FIXME chain changed here
1081 //    gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1082   }
1083 }
1084
1085 void
1086 gst_schedule_pad_connect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1087 {
1088   GstElement *srcelement,*sinkelement;
1089
1090   srcelement = GST_PAD_PARENT(srcpad);
1091   g_return_if_fail(srcelement != NULL);
1092   sinkelement = GST_PAD_PARENT(sinkpad);
1093   g_return_if_fail(sinkelement != NULL);
1094
1095   GST_INFO (GST_CAT_SCHEDULING, "have pad connected callback on %s:%s to %s:%s",GST_DEBUG_PAD_NAME(srcpad),GST_DEBUG_PAD_NAME(sinkpad));
1096   GST_DEBUG(GST_CAT_SCHEDULING, "srcpad sched is %p, sinkpad sched is %p\n",
1097 GST_ELEMENT_SCHED(srcelement),GST_ELEMENT_SCHED(sinkelement));
1098
1099   if (GST_ELEMENT_SCHED(srcelement) == GST_ELEMENT_SCHED(sinkelement)) {
1100     GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same schedule, chaining together",GST_DEBUG_PAD_NAME(sinkpad));
1101     gst_schedule_chain_elements (sched, srcelement, sinkelement);
1102   }
1103 }
1104
1105 // find the chain within the schedule that holds the element, if any
1106 GstScheduleChain *
1107 gst_schedule_find_chain (GstSchedule *sched, GstElement *element)
1108 {
1109   GList *chains;
1110   GstScheduleChain *chain;
1111
1112   GST_INFO (GST_CAT_SCHEDULING, "searching for element \"%s\" in chains",GST_ELEMENT_NAME(element));
1113
1114   chains = sched->chains;
1115   while (chains) {
1116     chain = (GstScheduleChain *)(chains->data);
1117     chains = g_list_next (chains);
1118
1119     if (g_list_find (chain->elements, element))
1120       return chain;
1121     if (g_list_find (chain->disabled, element))
1122       return chain;
1123   }
1124
1125   return NULL;
1126 }
1127
1128 void
1129 gst_schedule_chain_recursive_add (GstScheduleChain *chain, GstElement *element)
1130 {
1131   GList *pads;
1132   GstPad *pad;
1133   GstElement *peerelement;
1134
1135   // add the element to the chain
1136   gst_schedule_chain_add_element (chain, element);
1137
1138   GST_DEBUG(GST_CAT_SCHEDULING, "recursing on element \"%s\"\n",GST_ELEMENT_NAME(element));
1139   // now go through all the pads and see which peers can be added
1140   pads = element->pads;
1141   while (pads) {
1142     pad = GST_PAD(pads->data);
1143     pads = g_list_next (pads);
1144
1145     GST_DEBUG(GST_CAT_SCHEDULING, "have pad %s:%s, checking for valid peer\n",GST_DEBUG_PAD_NAME(pad));
1146     // if the peer exists and could be in the same chain
1147     if (GST_PAD_PEER(pad)) {
1148       GST_DEBUG(GST_CAT_SCHEDULING, "has peer %s:%s\n",GST_DEBUG_PAD_NAME(GST_PAD_PEER(pad)));
1149       peerelement = GST_PAD_PARENT(GST_PAD_PEER(pad));
1150       if (GST_ELEMENT_SCHED(GST_PAD_PARENT(pad)) == GST_ELEMENT_SCHED(peerelement)) {
1151         GST_DEBUG(GST_CAT_SCHEDULING, "peer \"%s\" is valid for same chain\n",GST_ELEMENT_NAME(peerelement));
1152         // if it's not already in a chain, add it to this one
1153         if (gst_schedule_find_chain (chain->sched, peerelement) == NULL) {
1154           gst_schedule_chain_recursive_add (chain, peerelement);
1155         }
1156       }
1157     }
1158   }
1159 }
1160
1161 void
1162 gst_schedule_pad_disconnect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1163 {
1164   GstScheduleChain *chain;
1165   GstElement *element1, *element2;
1166   GstScheduleChain *chain1, *chain2;
1167
1168   GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s",
1169             GST_DEBUG_PAD_NAME(srcpad), GST_DEBUG_PAD_NAME(sinkpad));
1170
1171   // we need to have the parent elements of each pad
1172   element1 = GST_ELEMENT(GST_PAD_PARENT(srcpad));
1173   element2 = GST_ELEMENT(GST_PAD_PARENT(sinkpad));
1174
1175   // first task is to remove the old chain they belonged to.
1176   // this can be accomplished by taking either of the elements,
1177   // since they are guaranteed to be in the same chain
1178   // FIXME is it potentially better to make an attempt at splitting cleaner??
1179   chain = gst_schedule_find_chain (sched, element1);
1180   if (chain) {
1181     GST_INFO (GST_CAT_SCHEDULING, "destroying chain");
1182     gst_schedule_chain_destroy (chain);
1183   }
1184
1185   // now create a new chain to hold element1 and build it from scratch
1186   chain1 = gst_schedule_chain_new (sched);
1187   gst_schedule_chain_recursive_add (chain1, element1);
1188
1189   // check the other element to see if it landed in the newly created chain
1190   if (gst_schedule_find_chain (sched, element2) == NULL) {
1191     // if not in chain, create chain and build from scratch
1192     chain2 = gst_schedule_chain_new (sched);
1193     gst_schedule_chain_recursive_add (chain2, element2);
1194   }
1195 }
1196
1197 GstPad*
1198 gst_schedule_pad_select (GstSchedule *sched, GList *padlist)
1199 {
1200   GstPad *pad = NULL;
1201   GList *padlist2 = padlist;
1202   GST_INFO (GST_CAT_SCHEDULING, "performing select");
1203
1204   while (padlist2) {
1205     pad = GST_PAD (padlist2->data);
1206
1207     if (gst_pad_peek (pad)) {
1208       g_print ("found something in pad %s:%s\n", GST_DEBUG_PAD_NAME (pad));
1209       return pad;
1210     }
1211     
1212     padlist2 = g_list_next (padlist2);
1213   }
1214
1215   /* else there is nothing ready to consume, set up the select functions */
1216   while (padlist) {
1217     pad = GST_PAD (padlist->data);
1218
1219     GST_RPAD_CHAINHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_schedule_select_proxy);
1220
1221     padlist = g_list_next (padlist);
1222   }
1223   if (pad != NULL) {
1224     GstRealPad *peer = GST_RPAD_PEER(pad);
1225     
1226     cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate);
1227
1228     g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
1229     pad = GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad;
1230
1231     g_assert (pad != NULL);
1232     g_print ("back from select (%s:%s)\n", GST_DEBUG_PAD_NAME (pad));
1233   }
1234   return pad;
1235 }
1236
1237 void
1238 gst_schedule_add_element (GstSchedule *sched, GstElement *element)
1239 {
1240   GList *pads;
1241   GstPad *pad;
1242   GstElement *peerelement;
1243   GstScheduleChain *chain;
1244
1245   g_return_if_fail (element != NULL);
1246   g_return_if_fail (GST_IS_ELEMENT(element));
1247
1248   // if it's already in this schedule, don't bother doing anything
1249   if (GST_ELEMENT_SCHED(element) == sched) return;
1250
1251   GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to schedule",
1252     GST_ELEMENT_NAME(element));
1253
1254   // if the element already has a different scheduler, remove the element from it
1255   if (GST_ELEMENT_SCHED(element)) {
1256     gst_schedule_remove_element(GST_ELEMENT_SCHED(element),element);
1257   }
1258
1259   // set the sched pointer in the element itself
1260   GST_ELEMENT_SCHED(element) = sched;
1261
1262   // only deal with elements after this point, not bins
1263   // exception is made for Bin's that are schedulable, like the autoplugger
1264   if (GST_IS_BIN (element) && !GST_FLAG_IS_SET(element, GST_BIN_SELF_SCHEDULABLE)) return;
1265
1266   // first add it to the list of elements that are to be scheduled
1267   sched->elements = g_list_prepend (sched->elements, element);
1268   sched->num_elements++;
1269
1270   // create a chain to hold it, and add
1271   chain = gst_schedule_chain_new (sched);
1272   gst_schedule_chain_add_element (chain, element);
1273
1274   // set the sched pointer in all the pads
1275   pads = element->pads;
1276   while (pads) {
1277     pad = GST_PAD(pads->data);
1278     pads = g_list_next(pads);
1279
1280     // we only operate on real pads
1281     if (!GST_IS_REAL_PAD(pad)) continue;
1282
1283     // set the pad's sched pointer
1284     gst_pad_set_sched (pad, sched);
1285
1286     // if the peer element exists and is a candidate
1287     if (GST_PAD_PEER(pad)) {
1288       peerelement = GST_PAD_PARENT( GST_PAD_PEER (pad) );
1289       if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
1290         GST_INFO (GST_CAT_SCHEDULING, "peer is in same schedule, chaining together");
1291         // make sure that the two elements are in the same chain
1292         gst_schedule_chain_elements (sched,element,peerelement);
1293       }
1294     }
1295   }
1296 }
1297
1298 void
1299 gst_schedule_enable_element (GstSchedule *sched, GstElement *element)
1300 {
1301   GstScheduleChain *chain;
1302
1303   // find the chain the element's in
1304   chain = gst_schedule_find_chain (sched, element);
1305
1306   if (chain)
1307     gst_schedule_chain_enable_element (chain, element);
1308   else
1309     GST_INFO (GST_CAT_SCHEDULING, "element not found in any chain, not enabling");
1310 }
1311
1312 void
1313 gst_schedule_disable_element (GstSchedule *sched, GstElement *element)
1314 {
1315   GstScheduleChain *chain;
1316
1317   // find the chain the element is in
1318   chain = gst_schedule_find_chain (sched, element);
1319
1320   // remove it from the chain
1321   if (chain) {
1322     gst_schedule_chain_disable_element(chain,element);
1323   }
1324 }
1325
1326 void
1327 gst_schedule_remove_element (GstSchedule *sched, GstElement *element)
1328 {
1329   GstScheduleChain *chain;
1330
1331   g_return_if_fail (element != NULL);
1332   g_return_if_fail (GST_IS_ELEMENT(element));
1333
1334   if (g_list_find (sched->elements, element)) {
1335     GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from schedule",
1336       GST_ELEMENT_NAME(element));
1337
1338     // find what chain the element is in
1339     chain = gst_schedule_find_chain(sched, element);
1340
1341     // remove it from its chain
1342     gst_schedule_chain_remove_element (chain, element);
1343
1344     // remove it from the list of elements
1345     sched->elements = g_list_remove (sched->elements, element);
1346     sched->num_elements--;
1347
1348     // unset the scheduler pointer in the element
1349     GST_ELEMENT_SCHED(element) = NULL;
1350   }
1351 }
1352
1353 gboolean
1354 gst_schedule_iterate (GstSchedule *sched)
1355 {
1356   GstBin *bin = GST_BIN(sched->parent);
1357   GList *chains;
1358   GstScheduleChain *chain;
1359   GstElement *entry;
1360   gint num_scheduled = 0;
1361   gboolean eos = FALSE;
1362   GList *elements;
1363
1364   GST_DEBUG_ENTER("(\"%s\")", GST_ELEMENT_NAME (bin));
1365
1366   g_return_val_if_fail (bin != NULL, TRUE);
1367   g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
1368 //  g_return_val_if_fail (GST_STATE (bin) == GST_STATE_PLAYING, TRUE);
1369
1370   // step through all the chains
1371   chains = sched->chains;
1372 //  if (chains == NULL) return FALSE;
1373 g_return_val_if_fail (chains != NULL, FALSE);
1374   while (chains) {
1375     chain = (GstScheduleChain *)(chains->data);
1376     chains = g_list_next (chains);
1377
1378 //    if (!chain->need_scheduling) continue;
1379
1380 //    if (chain->need_cothreads) {
1381       // all we really have to do is switch to the first child
1382       // FIXME this should be lots more intelligent about where to start
1383       GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via cothreads\n");
1384
1385       if (chain->elements) {
1386         entry = NULL; //MattH ADDED?
1387 GST_DEBUG(GST_CAT_SCHEDULING,"there are %d elements in this chain\n",chain->num_elements);
1388         elements = chain->elements;
1389         while (elements) {
1390           entry = GST_ELEMENT(elements->data);
1391           elements = g_list_next(elements);
1392           if (GST_FLAG_IS_SET(entry,GST_ELEMENT_DECOUPLED)) {
1393             GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is DECOUPLED, skipping\n",GST_ELEMENT_NAME(entry));
1394             entry = NULL;
1395           } else if (GST_FLAG_IS_SET(entry,GST_ELEMENT_NO_ENTRY)) {
1396             GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is not valid, skipping\n",GST_ELEMENT_NAME(entry));
1397             entry = NULL;
1398           } else
1399             break;
1400         }
1401         if (entry) {
1402           GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
1403           GST_DEBUG (GST_CAT_DATAFLOW,"set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
1404                GST_ELEMENT_NAME (entry),entry);
1405           cothread_switch (entry->threadstate);
1406
1407           // following is a check to see if the chain was interrupted due to a
1408           // top-half state_change().  (i.e., if there's a pending state.)
1409           //
1410           // if it was, return to gstthread.c::gst_thread_main_loop() to
1411           // execute the state change.
1412           GST_DEBUG (GST_CAT_DATAFLOW,"cothread switch ended or interrupted\n");
1413           if (GST_STATE_PENDING(GST_SCHEDULE(sched)->parent) != GST_STATE_VOID_PENDING)
1414           {
1415             GST_DEBUG (GST_CAT_DATAFLOW,"handle pending state %d\n",
1416                        GST_STATE_PENDING(GST_SCHEDULE(sched)->parent));
1417             return 0;
1418           }
1419
1420         } else {
1421           GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
1422           //eos = TRUE;
1423         }
1424       } else {
1425         GST_INFO (GST_CAT_DATAFLOW,"NO ENABLED ELEMENTS IN CHAIN!!");
1426         //eos = TRUE;
1427       }
1428
1429 /*                
1430     } else {
1431       GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via chain-functions\n");
1432
1433       entries = chain->entries;
1434          
1435       g_assert (entries != NULL);
1436      
1437       while (entries) {
1438         entry = GST_ELEMENT (entries->data);
1439         entries = g_list_next (entries);
1440  
1441         GST_DEBUG (GST_CAT_DATAFLOW,"have entry \"%s\"\n",GST_ELEMENT_NAME (entry));
1442   
1443         if (GST_IS_BIN (entry)) {
1444           gst_bin_iterate (GST_BIN (entry));
1445         } else {
1446           pads = entry->pads;
1447           while (pads) {
1448             pad = GST_PAD (pads->data);
1449             if (GST_RPAD_DIRECTION(pad) == GST_PAD_SRC) {
1450               GST_DEBUG (GST_CAT_DATAFLOW,"calling getfunc of %s:%s\n",GST_DEBUG_PAD_NAME(pad));
1451               if (GST_REAL_PAD(pad)->getfunc == NULL) 
1452                 fprintf(stderr, "error, no getfunc in \"%s\"\n", GST_ELEMENT_NAME  (entry));
1453               else
1454                 buf = (GST_REAL_PAD(pad)->getfunc)(pad);
1455               if (buf) gst_pad_push(pad,buf);
1456             }
1457             pads = g_list_next (pads);
1458           }
1459         }
1460       }
1461     }*/
1462     num_scheduled++;
1463   }
1464
1465 /*
1466   // check if nothing was scheduled that was ours..
1467   if (!num_scheduled) {
1468     // are there any other elements that are still busy?
1469     if (bin->num_eos_providers) {
1470       GST_LOCK (bin);
1471       GST_DEBUG (GST_CATA_DATAFLOW,"waiting for eos providers\n");
1472       g_cond_wait (bin->eoscond, GST_OBJECT(bin)->lock);  
1473       GST_DEBUG (GST_CAT_DATAFLOW,"num eos providers %d\n", bin->num_eos_providers);
1474       GST_UNLOCK (bin);
1475     }
1476     else {      
1477       gst_element_signal_eos (GST_ELEMENT (bin));
1478       eos = TRUE;
1479     }       
1480   }
1481 */
1482
1483   GST_DEBUG (GST_CAT_DATAFLOW, "leaving (%s)\n", GST_ELEMENT_NAME (bin));
1484   return !eos;
1485 }
1486
1487
1488
1489 void
1490 gst_schedule_show (GstSchedule *sched)
1491 {
1492   GList *chains, *elements;
1493   GstElement *element;
1494   GstScheduleChain *chain;
1495
1496   if (sched == NULL) {
1497     g_print("schedule doesn't exist for this element\n");
1498     return;
1499   }
1500
1501   g_return_if_fail(GST_IS_SCHEDULE(sched));
1502
1503   g_print("SCHEDULE DUMP FOR MANAGING BIN \"%s\"\n",GST_ELEMENT_NAME(sched->parent));
1504
1505   g_print("schedule has %d elements in it: ",sched->num_elements);
1506   elements = sched->elements;
1507   while (elements) {
1508     element = GST_ELEMENT(elements->data);
1509     elements = g_list_next(elements);
1510
1511     g_print("%s, ",GST_ELEMENT_NAME(element));
1512   }
1513   g_print("\n");
1514
1515   g_print("schedule has %d chains in it\n",sched->num_chains);
1516   chains = sched->chains;
1517   while (chains) {
1518     chain = (GstScheduleChain *)(chains->data);
1519     chains = g_list_next(chains);
1520
1521     g_print("%p: ",chain);
1522
1523     elements = chain->disabled;
1524     while (elements) {
1525       element = GST_ELEMENT(elements->data);
1526       elements = g_list_next(elements);
1527
1528       g_print("!%s, ",GST_ELEMENT_NAME(element));
1529     }
1530
1531     elements = chain->elements;
1532     while (elements) {
1533       element = GST_ELEMENT(elements->data);
1534       elements = g_list_next(elements);
1535
1536       g_print("%s, ",GST_ELEMENT_NAME(element));
1537     }
1538     g_print("\n");
1539   }
1540 }